Skip to content

Commit

Permalink
init improvements, add index options, multiple binds per filter
Browse files Browse the repository at this point in the history
resolving initialization issues, adding
  • Loading branch information
mieslep committed Dec 14, 2023
1 parent 1577a86 commit 0ede565
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 37 deletions.
51 changes: 43 additions & 8 deletions docs/core_docs/docs/integrations/vectorstores/cassandra.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,26 @@ vector store is first created, and specifed in the constructor parameter `vector
...
```

## Non-Equality Filters
## Indices

With Version 5, Cassandra introduced Storage Attached Indexes, or SAIs. These allow `WHERE` filtering without specifying
the partition key, and allow for additional operator types such as non-equalities. You can define these with the `indices`
parameter, which accepts zero or more dictionaries each containing `name` and `value` entries.

Indices are optional, though required if using filtered queries on non-partition columns.

- The `name` entry is part of the object name; on a table named `test_table` an index with `name: "some_column"`
would be `idx_test_table_some_column`.
- The `value` entry is the column on which the index is created, surrounded by `(` and `)`. With the above column
`some_column` it would be specified as `value: "(some_column)"`.
- An optional `options` entry is a map passed to the `WITH OPTIONS =` clause of the `CREATE CUSTOM INDEX` statement.
The specific entries on this map are index type specific.

```typescript
indices: [{ name: "some_column", value: "(some_column)" }],
```

## Advanced Filtering

By default, filters are applied with an equality `=`. For those fields that have an `indices` entry, you may
provide an `operator` with a string of a value supported by the index; in this case, you specify one or
Expand All @@ -115,6 +134,24 @@ or
];
```

`value` can be a single value or an array. If it is not an array, or there is only one element in `value`,
the resulting query will be along the lines of `${name} ${operator} ?` with `value` bound to the `?`.

If there is more than one element in the `value` array, the number of unquoted `?` in `name` are counted
and subtracted from the length of `value`, and this number of `?` is put on the right side of the operator;
if there are more than one `?` then they will be encapsulated in `(` and `)`, e.g. `(?, ?, ?)`.

This faciliates bind values on the left of the operator, which is useful for some functions; for example
a geo-distance filter:

```typescript
{
name: "GEO_DISTANCE(coord, ?)",
operator: "<",
value: [new Float32Array([53.3730617,-6.3000515]), 10000],
},
```

## Data Partitioning and Composite Keys

In some systems, you may wish to partition the data for various reasons, perhaps by user or by session. Data in Cassandra
Expand Down Expand Up @@ -146,16 +183,14 @@ In the configuration document, further optional parameters are provided; their d

```typescript
...,
indices: [],
maxConcurrency: 25,
batchSize: 1,
withClause: "",
...
```

| Parameter | Usage |
| ---------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `indices` | Optional, but required if using filtered queries on non-partition columns. Each metadata column (e.g. `<metadata_filter_column>`) to be indexed should appear as an entry in a list, in the format `[{name: "<metadata_filter_column>", value: "(<metadata_filter_column>)"}]`. |
| `maxConcurrency` | How many concurrent requests will be sent to Cassandra at a given time. |
| `batchSize` | How many documents will be sent on a single request to Cassandra. When using a value > 1, you should ensure your batch size will not exceed the Cassandra parameter `batch_size_fail_threshold_in_kb`. Batches are unlogged. |
| `withClause` | Cassandra tables may be created with an optional `WITH` clause; this is generally not needed but provided for completeness. |
| Parameter | Usage |
| ---------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `maxConcurrency` | How many concurrent requests will be sent to Cassandra at a given time. |
| `batchSize` | How many documents will be sent on a single request to Cassandra. When using a value > 1, you should ensure your batch size will not exceed the Cassandra parameter `batch_size_fail_threshold_in_kb`. Batches are unlogged. |
| `withClause` | Cassandra tables may be created with an optional `WITH` clause; this is generally not needed but provided for completeness. |
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ const cassandraConfig = {
};
const client = new Client(cassandraConfig);

// yarn test:single /langchain/src/stores/tests/cassandra.int.test.ts
// For internal testing:
// 1. switch "describe.skip(" to "describe("
// 2. Copy the SCB into the dev container (if using it)
// 3. Export OPENAI_API_KEY, CASSANDRA_SCB, and CASSANDRA_TOKEN
// 4. cd langchainjs/libs/langchain-community
// 5. yarn test:single src/stores/tests/cassandra.int.test.ts
// Once manual testing is complete, re-instate the ".skip"
describe.skip("CassandraChatMessageHistory", () => {
beforeAll(async () => {
await client.execute("DROP TABLE IF EXISTS test.test_message_history;");
Expand Down
116 changes: 92 additions & 24 deletions libs/langchain-community/src/vectorstores/cassandra.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ export interface Column {
export interface Index {
name: string;
value: string;
options?: string;
}

export interface Filter {
name: string;
value: unknown;
value: unknown | [unknown, ...unknown[]];
operator?: string;
}

Expand Down Expand Up @@ -72,7 +73,9 @@ export class CassandraStore extends VectorStore {

private indices: Index[];

private isInitialized = false;
private initializationState = 0; // 0: Not Initialized, 1: In Progress, 2: Initialized

private initializationPromise: Promise<void> | null = null;

asyncCaller: AsyncCaller;

Expand Down Expand Up @@ -119,6 +122,11 @@ export class CassandraStore extends VectorStore {
this.withClause = withClause.trim().replace(/^with\s*/i, "");
this.indices = indices;
this.batchSize = batchSize >= 1 ? batchSize : 1;

// Start initialization but don't wait for it to complete here
this.initialize().catch((error) => {
console.error("Error during CassandraStore initialization:", error);
});
}

/**
Expand All @@ -131,11 +139,6 @@ export class CassandraStore extends VectorStore {
if (vectors.length === 0) {
return;
}

if (!this.isInitialized) {
await this.initialize();
}

await this.insertAll(vectors, documents);
}

Expand Down Expand Up @@ -163,9 +166,7 @@ export class CassandraStore extends VectorStore {
k: number,
filter?: WhereClause
): Promise<[Document, number][]> {
if (!this.isInitialized) {
await this.initialize();
}
await this.initialize();

// Ensure we have an array of Filter from the public interface
const filters = this.asFilters(filter);
Expand All @@ -185,8 +186,13 @@ export class CassandraStore extends VectorStore {
const vectorAsFloat32Array = new Float32Array(query);
queryParams.push(vectorAsFloat32Array);
if (filters) {
const values = (filters as Filter[]).map(({ value }) => value);
queryParams.push(...values);
filters.forEach(({ value }) => {
if (Array.isArray(value)) {
queryParams.push(...value);
} else {
queryParams.push(value);
}
});
}
queryParams.push(vectorAsFloat32Array);
queryParams.push(k);
Expand All @@ -201,8 +207,6 @@ export class CassandraStore extends VectorStore {
delete sanitizedRow.text;
delete sanitizedRow.similarity_score;

// A null value in Cassandra evaluates to a deleted column
// as this is treated as a tombstone record for the cell.
Object.keys(sanitizedRow).forEach((key) => {
if (sanitizedRow[key] === null) {
delete sanitizedRow[key];
Expand Down Expand Up @@ -257,6 +261,8 @@ export class CassandraStore extends VectorStore {
args: CassandraLibArgs
): Promise<CassandraStore> {
const instance = new this(embeddings, args);
await instance.initialize();

await instance.addDocuments(docs);
return instance;
}
Expand All @@ -283,6 +289,29 @@ export class CassandraStore extends VectorStore {
* @returns Promise that resolves when the database has been initialized.
*/
private async initialize(): Promise<void> {
if (this.initializationState === 2) {
// Already initialized
return Promise.resolve();
}

if (this.initializationState === 1 && this.initializationPromise) {
// Initialization in progress, wait for it to complete
return this.initializationPromise;
}

// Start the initialization process
this.initializationState = 1;
this.initializationPromise = new Promise((resolve, reject) => {
this.performInitialization().then(resolve).catch(reject);
});

return this.initializationPromise;
}

/**
* Method to perform the initialization tasks
*/
private async performInitialization() {
let cql = "";
cql = `CREATE TABLE IF NOT EXISTS ${this.keyspace}.${this.table} (
${this.primaryKey.map((col) => `${col.name} ${col.type}`).join(", ")}
Expand Down Expand Up @@ -316,15 +345,30 @@ export class CassandraStore extends VectorStore {
cql = `CREATE CUSTOM INDEX IF NOT EXISTS idx_vector_${this.table}
ON ${this.keyspace}.${
this.table
}(vector) USING 'StorageAttachedIndex' WITH OPTIONS = {'similarity_function': '${this.vectorType.toUpperCase()}'};`;
}(vector) USING 'StorageAttachedIndex' WITH OPTIONS = {'similarity_function': '${this.vectorType.toLowerCase()}'};`;
await this.client.execute(cql);

for await (const { name, value } of this.indices) {
const formatOptions = (options: string | undefined): string => {
if (!options) {
return "";
}

let formattedOptions = options.trim();
if (!formattedOptions.toLowerCase().startsWith("with options =")) {
formattedOptions = "WITH OPTIONS = " + formattedOptions;
}

return formattedOptions;
};

for await (const { name, value, options } of this.indices) {
const optionsClause = formatOptions(options);
cql = `CREATE CUSTOM INDEX IF NOT EXISTS idx_${this.table}_${name}
ON ${this.keyspace}.${this.table} ${value} USING 'StorageAttachedIndex';`;
ON ${this.keyspace}.${this.table} ${value} USING 'StorageAttachedIndex' ${optionsClause};`;
await this.client.execute(cql);
}
this.isInitialized = true;

this.initializationState = 2; // Mark as initialized
}

/**
Expand Down Expand Up @@ -426,9 +470,33 @@ export class CassandraStore extends VectorStore {
return "";
}

const whereConditions = filters.map(
({ name, operator = "=" }) => `${name} ${operator} ?`
);
const whereConditions = filters.map(({ name, operator = "=", value }) => {
// If value is not an array or an array with only one element, use a single '?'
if (!Array.isArray(value) || value.length === 1) {
return `${name} ${operator} ?`;
}

// From this point, value is an array with multiple elements

// Count '?' placeholders in 'name', excluding those inside quotes
const quotesPattern = /'[^']*'|"[^"]*"/g; // Pattern to match quoted strings (both single and double quotes)
const modifiedName = name.replace(quotesPattern, ""); // Remove quoted strings from 'name'
const nameQuestionMarkCount = (modifiedName.match(/\?/g) || []).length; // Count '?' in the modified string

// Check if there are enough elements in the array for the right side of the operator
if (value.length - nameQuestionMarkCount < 1) {
throw new Error(
"Insufficient bind variables for the filter condition."
);
}

// Generate the placeholders for the right side of the operator
const rightPlaceholders = new Array(value.length - nameQuestionMarkCount)
.fill("?")
.join(", ");

return `${name} ${operator} ${rightPlaceholders}`;
});

return `WHERE ${whereConditions.join(" AND ")}`;
}
Expand Down Expand Up @@ -460,6 +528,8 @@ export class CassandraStore extends VectorStore {
batchVectors: number[][],
batchDocuments: Document[]
): Promise<void> {
await this.initialize();

// Input validation: Check if the lengths of batchVectors and batchDocuments are the same
if (batchVectors.length !== batchDocuments.length) {
throw new Error(
Expand Down Expand Up @@ -532,9 +602,7 @@ export class CassandraStore extends VectorStore {
}

// Ensure the store is initialized before proceeding
if (!this.isInitialized) {
await this.initialize();
}
await this.initialize();

// Initialize an array to hold promises for each batch insert
const insertPromises: Promise<void>[] = [];
Expand Down
Loading

0 comments on commit 0ede565

Please sign in to comment.