Skip to content

Commit

Permalink
Revert "Revert "Added Hierarchical Partition Key Support" (Azure#24806)"
Browse files Browse the repository at this point in the history
This reverts commit 91272c9.
  • Loading branch information
v1k1 committed Apr 3, 2023
1 parent 90f8b8a commit 231113b
Show file tree
Hide file tree
Showing 26 changed files with 1,811 additions and 859 deletions.
35 changes: 21 additions & 14 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class ClientContext {
batch<T>({ body, path, partitionKey, resourceId, options, }: {
body: T;
path: string;
partitionKey: string;
partitionKey: PartitionKey;
resourceId: string;
options?: RequestOptions;
}): Promise<Response_2<any>>;
Expand Down Expand Up @@ -590,7 +590,7 @@ export interface CreateOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Create;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
// (undocumented)
resourceBody: JSONObject;
}
Expand Down Expand Up @@ -700,7 +700,7 @@ export interface DeleteOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Delete;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
}

// @public (undocumented)
Expand Down Expand Up @@ -740,8 +740,10 @@ export type ExistingKeyOperation = {
path: string;
};

// @public (undocumented)
export function extractPartitionKey(document: unknown, partitionKeyDefinition: PartitionKeyDefinition): PartitionKey[];
// Warning: (ae-forgotten-export) The symbol "PartitionKeyInternal" needs to be exported by the entry point index.d.ts
//
// @public
export function extractPartitionKey(document: unknown, partitionKeyDefinition?: PartitionKeyDefinition): PartitionKeyInternal | undefined;

// @public
export interface FeedOptions extends SharedOptions {
Expand All @@ -758,7 +760,7 @@ export interface FeedOptions extends SharedOptions {
forceQueryPlan?: boolean;
maxDegreeOfParallelism?: number;
maxItemCount?: number;
partitionKey?: any;
partitionKey?: PartitionKey;
populateIndexMetrics?: boolean;
populateQueryMetrics?: boolean;
useIncrementalFeed?: boolean;
Expand Down Expand Up @@ -879,7 +881,7 @@ export enum IndexKind {

// @public
export class Item {
constructor(container: Container, id: string, partitionKey: PartitionKey, clientContext: ClientContext);
constructor(container: Container, id: string, clientContext: ClientContext, partitionKey?: PartitionKey);
// (undocumented)
readonly container: Container;
delete<T extends ItemDefinition = any>(options?: RequestOptions): Promise<ItemResponse<T>>;
Expand Down Expand Up @@ -909,7 +911,7 @@ export class ItemResponse<T extends ItemDefinition> extends ResourceResponse<T &
// @public
export class Items {
constructor(container: Container, clientContext: ClientContext);
batch(operations: OperationInput[], partitionKey?: string, options?: RequestOptions): Promise<Response_2<OperationResponse[]>>;
batch(operations: OperationInput[], partitionKey?: PartitionKey, options?: RequestOptions): Promise<Response_2<OperationResponse[]>>;
bulk(operations: OperationInput[], bulkOptions?: BulkOptions, options?: RequestOptions): Promise<OperationResponse[]>;
changeFeed(partitionKey: string | number | boolean, changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<any>;
changeFeed(changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<any>;
Expand Down Expand Up @@ -1085,15 +1087,20 @@ export interface PartitionedQueryExecutionInfo {
queryRanges: QueryRange[];
}

// Warning: (ae-forgotten-export) The symbol "PrimitivePartitionKeyValue" needs to be exported by the entry point index.d.ts
//
// @public (undocumented)
export type PartitionKey = PartitionKeyDefinition | string | number | unknown;
export type PartitionKey = PrimitivePartitionKeyValue | PrimitivePartitionKeyValue[];

// @public (undocumented)
export interface PartitionKeyDefinition {
// Warning: (ae-forgotten-export) The symbol "PartitionKeyKind" needs to be exported by the entry point index.d.ts
kind?: PartitionKeyKind;
paths: string[];
// (undocumented)
systemKey?: boolean;
version?: number;
// Warning: (ae-forgotten-export) The symbol "PartitionKeyDefinitionVersion" needs to be exported by the entry point index.d.ts
version?: PartitionKeyDefinitionVersion;
}

// @public (undocumented)
Expand Down Expand Up @@ -1138,7 +1145,7 @@ export interface PatchOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Patch;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
// (undocumented)
resourceBody: PatchRequestBody;
}
Expand Down Expand Up @@ -1392,7 +1399,7 @@ export interface ReadOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Read;
// (undocumented)
partitionKey?: string | number | boolean | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
}

// @public (undocumented)
Expand All @@ -1418,7 +1425,7 @@ export interface ReplaceOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Replace;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
// (undocumented)
resourceBody: JSONObject;
}
Expand Down Expand Up @@ -2027,7 +2034,7 @@ export interface UpsertOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Upsert;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
// (undocumented)
resourceBody: JSONObject;
}
Expand Down
18 changes: 14 additions & 4 deletions sdk/cosmosdb/cosmos/src/ClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ import { Constants, HTTPMethod, OperationType, ResourceType } from "./common/con
import { getIdFromLink, getPathFromLink, parseLink } from "./common/helper";
import { StatusCodes, SubStatusCodes } from "./common/statusCodes";
import { Agent, CosmosClientOptions } from "./CosmosClientOptions";
import { ConnectionPolicy, ConsistencyLevel, DatabaseAccount, PartitionKey } from "./documents";
import {
ConnectionPolicy,
ConsistencyLevel,
DatabaseAccount,
PartitionKey,
convertToInternalPartitionKey,
} from "./documents";
import { GlobalEndpointManager } from "./globalEndpointManager";
import { PluginConfig, PluginOn, executePlugins } from "./plugins/Plugin";
import { FetchFunctionCallback, SqlQuerySpec } from "./queryExecutionContext";
Expand Down Expand Up @@ -602,7 +608,7 @@ export class ClientContext {
}: {
body: T;
path: string;
partitionKey: string;
partitionKey: PartitionKey;
resourceId: string;
options?: RequestOptions;
}): Promise<Response<any>> {
Expand Down Expand Up @@ -759,12 +765,16 @@ export class ClientContext {
options: requestContext.options,
partitionKeyRangeId: requestContext.partitionKeyRangeId,
useMultipleWriteLocations: this.connectionPolicy.useMultipleWriteLocations,
partitionKey: requestContext.partitionKey,
partitionKey:
requestContext.partitionKey !== undefined
? convertToInternalPartitionKey(requestContext.partitionKey)
: undefined, // TODO: Move this check from here to PartitionKey
});
}

/**
* Returns collection of properties which are derived from the context for Request Creation
* Returns collection of properties which are derived from the context for Request Creation.
* These properties have client wide scope, as opposed to request specific scope.
* @returns
*/
private getContextDerivedPropsForRequestCreation(): {
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmosdb/cosmos/src/client/Container/Container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class Container {
* `const {body: replacedItem} = await container.item("<item id>", "<partition key value>").replace({id: "<item id>", title: "Updated post", authorID: 5});`
*/
public item(id: string, partitionKeyValue?: PartitionKey): Item {
return new Item(this, id, partitionKeyValue, this.clientContext);
return new Item(this, id, this.clientContext, partitionKeyValue);
}

/**
Expand Down
11 changes: 6 additions & 5 deletions sdk/cosmosdb/cosmos/src/client/Item/Item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
ResourceType,
StatusCodes,
} from "../../common";
import { PartitionKey } from "../../documents";
import { PartitionKey, PartitionKeyInternal, convertToInternalPartitionKey } from "../../documents";
import { extractPartitionKey, undefinedPartitionKey } from "../../extractPartitionKey";
import { RequestOptions, Response } from "../../request";
import { PatchRequestBody } from "../../utils/patch";
Expand All @@ -24,7 +24,7 @@ import { ItemResponse } from "./ItemResponse";
* @see {@link Items} for operations on all items; see `container.items`.
*/
export class Item {
private partitionKey: PartitionKey;
private partitionKey: PartitionKeyInternal;
/**
* Returns a reference URL to the resource. Used for linking in Permissions.
*/
Expand All @@ -41,10 +41,11 @@ export class Item {
constructor(
public readonly container: Container,
public readonly id: string,
partitionKey: PartitionKey,
private readonly clientContext: ClientContext
private readonly clientContext: ClientContext,
partitionKey?: PartitionKey
) {
this.partitionKey = partitionKey;
this.partitionKey =
partitionKey === undefined ? undefined : convertToInternalPartitionKey(partitionKey);
}

/**
Expand Down
77 changes: 52 additions & 25 deletions sdk/cosmosdb/cosmos/src/client/Item/Items.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ import { ItemResponse } from "./ItemResponse";
import {
Batch,
isKeyInRange,
Operation,
getPartitionKeyToHash,
decorateOperation,
prepareOperations,
OperationResponse,
OperationInput,
BulkOptions,
decorateBatchOperation,
splitBatchBasedOnBodySize,
} from "../../utils/batch";
import { hashV1PartitionKey } from "../../utils/hashing/v1";
import { hashV2PartitionKey } from "../../utils/hashing/v2";
import { assertNotUndefined } from "../../utils/typeChecks";
import { hashPartitionKey } from "../../utils/hashing/hash";
import { PartitionKey, PartitionKeyDefinition } from "../../documents";

/**
* @hidden
Expand Down Expand Up @@ -289,8 +288,8 @@ export class Items {
const ref = new Item(
this.container,
(response.result as any).id,
partitionKey,
this.clientContext
this.clientContext,
partitionKey
);
return new ItemResponse(
response.result,
Expand Down Expand Up @@ -361,8 +360,8 @@ export class Items {
const ref = new Item(
this.container,
(response.result as any).id,
partitionKey,
this.clientContext
this.clientContext,
partitionKey
);
return new ItemResponse(
response.result,
Expand Down Expand Up @@ -409,7 +408,8 @@ export class Items {
const { resources: partitionKeyRanges } = await this.container
.readPartitionKeyRanges()
.fetchAll();
const { resource: definition } = await this.container.getPartitionKeyDefinition();
const { resource } = await this.container.readPartitionKeyDefinition();
const partitionDefinition = assertNotUndefined(resource, "PartitionKeyDefinition.");
const batches: Batch[] = partitionKeyRanges.map((keyRange: PartitionKeyRange) => {
return {
min: keyRange.minInclusive,
Expand All @@ -419,19 +419,8 @@ export class Items {
operations: [],
};
});
operations
.map((operation) => decorateOperation(operation, definition, options))
.forEach((operation: Operation, index: number) => {
const partitionProp = definition.paths[0].replace("/", "");
const isV2 = definition.version && definition.version === 2;
const toHashKey = getPartitionKeyToHash(operation, partitionProp);
const hashed = isV2 ? hashV2PartitionKey(toHashKey) : hashV1PartitionKey(toHashKey);
const batchForKey = batches.find((batch: Batch) => {
return isKeyInRange(batch.min, batch.max, hashed);
});
batchForKey.operations.push(operation);
batchForKey.indexes.push(index);
});

this.groupOperationsBasedOnPartitionKey(operations, partitionDefinition, options, batches);

const path = getPathFromLink(this.container.url, ResourceType.item);

Expand Down Expand Up @@ -462,7 +451,8 @@ export class Items {
// partition key types as well since we don't support them, so for now we throw
if (err.code === 410) {
throw new Error(
"Partition key error. Either the partitions have split or an operation has an unsupported partitionKey type"
"Partition key error. Either the partitions have split or an operation has an unsupported partitionKey type" +
err.message
);
}
throw new Error(`Bulk request errored with: ${err.message}`);
Expand All @@ -472,6 +462,43 @@ export class Items {
return orderedResponses;
}

/**
* Function to create batches based of partition key Ranges.
* @param operations - operations to group
* @param partitionDefinition - PartitionKey definition of container.
* @param options - Request options for bulk request.
* @param batches - Groups to be filled with operations.
*/
private groupOperationsBasedOnPartitionKey(
operations: OperationInput[],
partitionDefinition: PartitionKeyDefinition,
options: RequestOptions | undefined,
batches: Batch[]
) {
operations.forEach((operationInput, index: number) => {
const { operation, partitionKey } = prepareOperations(
operationInput,
partitionDefinition,
options
);
const hashed = hashPartitionKey(
assertNotUndefined(
partitionKey,
"undefined value for PartitionKey not expected during grouping of bulk operations."
),
partitionDefinition
);
const batchForKey = assertNotUndefined(
batches.find((batch: Batch) => {
return isKeyInRange(batch.min, batch.max, hashed);
}),
"No suitable Batch found."
);
batchForKey.operations.push(operation);
batchForKey.indexes.push(index);
});
}

/**
* Execute transactional batch operations on items.
*
Expand Down Expand Up @@ -501,7 +528,7 @@ export class Items {
*/
public async batch(
operations: OperationInput[],
partitionKey: string = "[{}]",
partitionKey?: PartitionKey,
options?: RequestOptions
): Promise<Response<OperationResponse[]>> {
operations.map((operation) => decorateBatchOperation(operation, options));
Expand Down
Loading

0 comments on commit 231113b

Please sign in to comment.