Skip to content

Commit

Permalink
feat: add compression for dictionaries (#1223)
Browse files Browse the repository at this point in the history
Adds compression support for dictionaries, specifically on dictionarySetField, dictionarySetFields, dictionaryGetField, dictionaryGetFields, and dictionaryFetch. We add integration tests on the compression library and refactor the library as well to handle comparing Uint8Arrays better.

In a future PR we ought to refactor the backend to reduce code duplication. With the integration tests in place now, that should be less risky.
  • Loading branch information
malandis authored Apr 10, 2024
1 parent 0fcaf9e commit 799a868
Show file tree
Hide file tree
Showing 7 changed files with 1,015 additions and 112 deletions.

Large diffs are not rendered by default.

220 changes: 198 additions & 22 deletions packages/client-sdk-nodejs/src/internal/cache-data-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,14 @@ import {grpcChannelOptionsFromGrpcConfig} from './grpc/grpc-channel-options';
import {ConnectionError} from '@gomomento/sdk-core/dist/src/errors';
import {common} from '@gomomento/generated-types/dist/common';
import {
DictionaryFetchCallOptions,
DictionaryGetFieldCallOptions,
DictionaryGetFieldsCallOptions,
DictionarySetFieldCallOptions,
DictionarySetFieldsCallOptions,
GetCallOptions,
SetCallOptions,
ttlOrFromCacheTtl,
} from '@gomomento/sdk-core/dist/src/utils';
import grpcCache = cache.cache_client;
import ECacheResult = cache_client.ECacheResult;
Expand Down Expand Up @@ -1447,7 +1453,7 @@ export class CacheDataClient implements IDataClient {
resolve(
new CacheGet.Error(
new InvalidArgumentError(
'Compressor is not set, but decompress was configured; please install @gomomento/sdk-nodejs-compression and call `Configuration.withCompressionStrategy` to enable compression.'
'Compressor is not set, but `CacheClient.get` was called with the `decompress` option; please install @gomomento/sdk-nodejs-compression and call `Configuration.withCompressionStrategy` to enable compression.'
)
)
);
Expand Down Expand Up @@ -2420,7 +2426,8 @@ export class CacheDataClient implements IDataClient {

public async dictionaryFetch(
cacheName: string,
dictionaryName: string
dictionaryName: string,
options?: DictionaryFetchCallOptions
): Promise<CacheDictionaryFetch.Response> {
try {
validateCacheName(cacheName);
Expand All @@ -2439,7 +2446,8 @@ export class CacheDataClient implements IDataClient {
);
const result = await this.sendDictionaryFetch(
cacheName,
this.convert(dictionaryName)
this.convert(dictionaryName),
options
);
this.logger.trace(
`'dictionaryFetch' request result: ${result.toString()}`
Expand All @@ -2452,7 +2460,8 @@ export class CacheDataClient implements IDataClient {

private async sendDictionaryFetch(
cacheName: string,
dictionaryName: Uint8Array
dictionaryName: Uint8Array,
options?: DictionaryFetchCallOptions
): Promise<CacheDictionaryFetch.Response> {
const request = new grpcCache._DictionaryFetchRequest({
dictionary_name: dictionaryName,
Expand All @@ -2467,7 +2476,48 @@ export class CacheDataClient implements IDataClient {
},
(err, resp) => {
if (resp?.found) {
resolve(new CacheDictionaryFetch.Hit(resp.found.items));
if (!options?.decompress) {
resolve(new CacheDictionaryFetch.Hit(resp.found.items));
} else {
if (this.valueCompressor === undefined) {
resolve(
new CacheDictionaryFetch.Error(
new InvalidArgumentError(
'Compressor is not set, but `CacheClient.dictionaryFetch` was called with the `decompress` option; please install @gomomento/sdk-nodejs-compression and call `Configuration.withCompressionStrategy` to enable compression.'
)
)
);
} else {
const decompressedItemPromises = resp.found.items.map(item => {
// This check shouldn't be necessary given the one in the outer scope,
// but the TypeScript compiler doesn't seem to understand that.
if (this.valueCompressor === undefined) {
throw new InvalidArgumentError(
'Compressor is not set, but `CacheClient.dictionaryFetch` was called with the `decompress` option; please install @gomomento/sdk-nodejs-compression and call `Configuration.withCompressionStrategy` to enable compression.'
);
}

return this.valueCompressor
.decompressIfCompressed(item.value)
.then(v => {
item.value = v;
return item;
})
.catch(e => {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
throw new InvalidArgumentError(`${e}`);
});
});

Promise.all(decompressedItemPromises)
.then(items => {
resolve(new CacheDictionaryFetch.Hit(items));
})
.catch((e: InvalidArgumentError) => {
resolve(new CacheDictionaryFetch.Error(e));
});
}
}
} else if (resp?.missing) {
resolve(new CacheDictionaryFetch.Miss());
} else {
Expand All @@ -2488,7 +2538,7 @@ export class CacheDataClient implements IDataClient {
dictionaryName: string,
field: string | Uint8Array,
value: string | Uint8Array,
ttl: CollectionTtl = CollectionTtl.fromCacheTtl()
options?: DictionarySetFieldCallOptions
): Promise<CacheDictionarySetField.Response> {
try {
validateCacheName(cacheName);
Expand All @@ -2500,19 +2550,41 @@ export class CacheDataClient implements IDataClient {
);
}

const ttl = ttlOrFromCacheTtl(options);

let encodedValue = this.convert(value);
if (options?.compress) {
this.logger.trace(
'CacheClient.dictionarySetField; compression enabled, calling value compressor'
);
if (this.valueCompressor === undefined) {
return this.cacheServiceErrorMapper.returnOrThrowError(
new InvalidArgumentError(
'Compressor is not set, but `CacheClient.dictionarySetField` was called with the `compress` option; please install @gomomento/sdk-nodejs-compression and call `Configuration.withCompressionStrategy` to enable compression.'
),
err => new CacheDictionarySetField.Error(err)
);
}
encodedValue = await this.valueCompressor.compress(
this.configuration.getCompressionStrategy()?.compressionLevel ??
CompressionLevel.Balanced,
encodedValue
);
}

try {
await this.requestConcurrencySemaphore.acquire();
this.logger.trace(
`Issuing 'dictionarySetField' request; field: ${field.toString()}, value length: ${
value.length
encodedValue.length
}, ttl: ${ttl.ttlSeconds.toString() ?? 'null'}`
);

const result = await this.sendDictionarySetField(
cacheName,
this.convert(dictionaryName),
this.convert(field),
this.convert(value),
encodedValue,
ttl.ttlMilliseconds() || this.defaultTtlSeconds * 1000,
ttl.refreshTtl()
);
Expand Down Expand Up @@ -2570,7 +2642,7 @@ export class CacheDataClient implements IDataClient {
| Map<string | Uint8Array, string | Uint8Array>
| Record<string, string | Uint8Array>
| Array<[string, string | Uint8Array]>,
ttl: CollectionTtl = CollectionTtl.fromCacheTtl()
options?: DictionarySetFieldsCallOptions
): Promise<CacheDictionarySetFields.Response> {
try {
validateCacheName(cacheName);
Expand All @@ -2582,6 +2654,8 @@ export class CacheDataClient implements IDataClient {
);
}

const ttl = ttlOrFromCacheTtl(options);

try {
await this.requestConcurrencySemaphore.acquire();
this.logger.trace(
Expand All @@ -2591,6 +2665,26 @@ export class CacheDataClient implements IDataClient {
);

const dictionaryFieldValuePairs = this.convertElements(elements);
if (options?.compress) {
this.logger.trace(
'CacheClient.dictionarySetFields; compression enabled, calling value compressor'
);
if (this.valueCompressor === undefined) {
return this.cacheServiceErrorMapper.returnOrThrowError(
new InvalidArgumentError(
'Compressor is not set, but `CacheClient.dictionarySetFields` was called with the `compress` option; please install @gomomento/sdk-nodejs-compression and call `Configuration.withCompressionStrategy` to enable compression.'
),
err => new CacheDictionarySetFields.Error(err)
);
}
for (const pair of dictionaryFieldValuePairs) {
pair.value = await this.valueCompressor.compress(
this.configuration.getCompressionStrategy()?.compressionLevel ??
CompressionLevel.Balanced,
pair.value
);
}
}

const result = await this.sendDictionarySetFields(
cacheName,
Expand Down Expand Up @@ -2649,7 +2743,8 @@ export class CacheDataClient implements IDataClient {
public async dictionaryGetField(
cacheName: string,
dictionaryName: string,
field: string | Uint8Array
field: string | Uint8Array,
options?: DictionaryGetFieldCallOptions
): Promise<CacheDictionaryGetField.Response> {
try {
validateCacheName(cacheName);
Expand All @@ -2669,7 +2764,8 @@ export class CacheDataClient implements IDataClient {
const result = await this.sendDictionaryGetField(
cacheName,
this.convert(dictionaryName),
this.convert(field)
this.convert(field),
options
);
this.logger.trace(
`'dictionaryGetField' request result: ${result.toString()}`
Expand All @@ -2683,7 +2779,8 @@ export class CacheDataClient implements IDataClient {
private async sendDictionaryGetField(
cacheName: string,
dictionaryName: Uint8Array,
field: Uint8Array
field: Uint8Array,
options?: DictionaryGetFieldCallOptions
): Promise<CacheDictionaryGetField.Response> {
const request = new grpcCache._DictionaryGetRequest({
dictionary_name: dictionaryName,
Expand Down Expand Up @@ -2716,12 +2813,40 @@ export class CacheDataClient implements IDataClient {
) {
resolve(new CacheDictionaryGetField.Miss(field));
} else {
resolve(
new CacheDictionaryGetField.Hit(
resp?.found.items[0].cache_body,
field
)
);
if (!options?.decompress) {
resolve(
new CacheDictionaryGetField.Hit(
resp?.found.items[0].cache_body,
field
)
);
} else {
if (this.valueCompressor === undefined) {
resolve(
new CacheDictionaryGetField.Error(
new InvalidArgumentError(
'Compressor is not set, but `CacheClient.dictionaryGetField` was called with the `decompress` option; please install @gomomento/sdk-nodejs-compression and call `Configuration.withCompressionStrategy` to enable compression.'
),
field
)
);
} else {
this.valueCompressor
.decompressIfCompressed(resp?.found.items[0].cache_body)
.then(v => {
resolve(new CacheDictionaryGetField.Hit(v, field));
})
.catch(e =>
resolve(
new CacheDictionaryGetField.Error(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
new InvalidArgumentError(`${e}`),
field
)
)
);
}
}
}
} else {
this.cacheServiceErrorMapper.resolveOrRejectError({
Expand All @@ -2740,7 +2865,8 @@ export class CacheDataClient implements IDataClient {
public async dictionaryGetFields(
cacheName: string,
dictionaryName: string,
fields: string[] | Uint8Array[]
fields: string[] | Uint8Array[],
options?: DictionaryGetFieldsCallOptions
): Promise<CacheDictionaryGetFields.Response> {
try {
validateCacheName(cacheName);
Expand All @@ -2760,7 +2886,8 @@ export class CacheDataClient implements IDataClient {
const result = await this.sendDictionaryGetFields(
cacheName,
this.convert(dictionaryName),
this.convertArray(fields)
this.convertArray(fields),
options
);
this.logger.trace(
`'dictionaryGetFields' request result: ${result.toString()}`
Expand All @@ -2774,7 +2901,8 @@ export class CacheDataClient implements IDataClient {
private async sendDictionaryGetFields(
cacheName: string,
dictionaryName: Uint8Array,
fields: Uint8Array[]
fields: Uint8Array[],
options?: DictionaryGetFieldsCallOptions
): Promise<CacheDictionaryGetFields.Response> {
const request = new grpcCache._DictionaryGetRequest({
dictionary_name: dictionaryName,
Expand All @@ -2795,7 +2923,55 @@ export class CacheDataClient implements IDataClient {
const result = this.convertECacheResult(item.result);
return new _DictionaryGetResponsePart(result, item.cache_body);
});
resolve(new CacheDictionaryGetFields.Hit(items, fields));

if (!options?.decompress) {
resolve(new CacheDictionaryGetFields.Hit(items, fields));
} else {
if (this.valueCompressor === undefined) {
resolve(
new CacheDictionaryGetFields.Error(
new InvalidArgumentError(
'Compressor is not set, but `CacheClient.dictionaryGetFields` was called with the `decompress` option; please install @gomomento/sdk-nodejs-compression and call `Configuration.withCompressionStrategy` to enable compression.'
)
)
);
} else {
const decompressedItemPromises = items.map(item => {
if (item.result === _ECacheResult.Miss) {
return Promise.resolve(item);
}

// This check shouldn't be necessary given the one in the outer scope,
// but the TypeScript compiler doesn't seem to understand that.
if (this.valueCompressor === undefined) {
throw new InvalidArgumentError(
'Compressor is not set, but `CacheClient.dictionaryGetFields` was called with the `decompress` option; please install @gomomento/sdk-nodejs-compression and call `Configuration.withCompressionStrategy` to enable compression.'
);
}

return this.valueCompressor
.decompressIfCompressed(item.cacheBody)
.then(v => new _DictionaryGetResponsePart(item.result, v))
.catch(e => {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
throw new InvalidArgumentError(`${e}`);
});
});

Promise.all(decompressedItemPromises)
.then(decompressedItems => {
resolve(
new CacheDictionaryGetFields.Hit(
decompressedItems,
fields
)
);
})
.catch((e: InvalidArgumentError) => {
resolve(new CacheDictionaryGetFields.Error(e));
});
}
}
} else if (resp?.dictionary === 'missing') {
resolve(new CacheDictionaryGetFields.Miss());
} else {
Expand Down
Loading

0 comments on commit 799a868

Please sign in to comment.