From 104559c2bb9a2f8d132ad06c8394e6a279ae70da Mon Sep 17 00:00:00 2001 From: Tommaso Bolis Date: Tue, 12 Nov 2024 17:26:27 +0100 Subject: [PATCH 1/6] Drafted list sources operation for weviate. Block other vector store from using it until fully tested. --- pom.xml | 2 +- .../vectors/internal/constant/Constants.java | 3 + .../operation/EmbeddingOperations.java | 2 +- .../{helper => }/store/VectorStore.java | 74 ++++++++++--------- .../store/aisearch/AISearchStore.java | 40 +++------- .../store/milvus/MilvusStore.java | 36 +++------ .../store/pgvector/PGVectorStore.java | 35 +++------ 7 files changed, 76 insertions(+), 116 deletions(-) rename src/main/java/org/mule/extension/mulechain/vectors/internal/{helper => }/store/VectorStore.java (83%) rename src/main/java/org/mule/extension/mulechain/vectors/internal/{helper => }/store/aisearch/AISearchStore.java (73%) rename src/main/java/org/mule/extension/mulechain/vectors/internal/{helper => }/store/milvus/MilvusStore.java (65%) rename src/main/java/org/mule/extension/mulechain/vectors/internal/{helper => }/store/pgvector/PGVectorStore.java (80%) diff --git a/pom.xml b/pom.xml index 03d8142..113ed50 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.mule.mulechain mulechain-vectors - 0.1.87-SNAPSHOT + 0.1.88-SNAPSHOT mule-extension MAC Vectors diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/constant/Constants.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/constant/Constants.java index cc38c9d..c496809 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/constant/Constants.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/constant/Constants.java @@ -41,6 +41,9 @@ private Constants() {} public static final String VECTOR_STORE_AI_SEARCH = "AI_SEARCH"; public static final String VECTOR_STORE_NEO4J = "NEO4J"; + public static final String STORE_SCHEMA_METADATA_FIELD_NAME = "metadata"; + public static final String STORE_SCHEMA_VECTOR_FIELD_NAME = "vector"; + public static final String METADATA_KEY_SOURCE_ID = "source_id"; public static final String METADATA_KEY_INDEX = "index"; public static final String METADATA_KEY_FILE_NAME = "file_name"; diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java index 02610ea..4d24b5c 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java @@ -19,7 +19,7 @@ import dev.langchain4j.store.embedding.filter.Filter; import org.json.JSONArray; import org.json.JSONObject; -import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; import org.mule.runtime.extension.api.annotation.Alias; import org.mule.runtime.extension.api.annotation.param.*; diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java similarity index 83% rename from src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java rename to src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java index f40e30f..b8b0009 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/VectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java @@ -1,4 +1,4 @@ -package org.mule.extension.mulechain.vectors.internal.helper.store; +package org.mule.extension.mulechain.vectors.internal.store; import dev.langchain4j.data.document.Metadata; import dev.langchain4j.data.embedding.Embedding; @@ -16,9 +16,9 @@ import org.mule.extension.mulechain.vectors.internal.helper.factory.EmbeddingStoreFactory; import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; -import org.mule.extension.mulechain.vectors.internal.helper.store.aisearch.AISearchStore; -import org.mule.extension.mulechain.vectors.internal.helper.store.milvus.MilvusStore; -import org.mule.extension.mulechain.vectors.internal.helper.store.pgvector.PGVectorStore; +import org.mule.extension.mulechain.vectors.internal.store.aisearch.AISearchStore; +import org.mule.extension.mulechain.vectors.internal.store.milvus.MilvusStore; +import org.mule.extension.mulechain.vectors.internal.store.pgvector.PGVectorStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +32,11 @@ public class VectorStore { protected static final Logger LOGGER = LoggerFactory.getLogger(VectorStore.class); + protected static final String JSON_KEY_SOURCES = "sources"; + protected static final String JSON_KEY_SEGMENT_COUNT = "segmentCount"; + protected static final String JSON_KEY_SOURCE_COUNT = "sourceCount"; + protected static final String JSON_KEY_STORE_NAME = "storeName"; + protected String storeName; protected Configuration configuration; protected QueryParameters queryParams; @@ -58,8 +63,8 @@ public Embedding getZeroVectorEmbedding() { // Create a general query vector (e.g., zero vector). Zero vector is often used when you need to retrieve all // embeddings without any specific bias. - float[] queryVector = new float[embeddingModel.dimension()]; - for (int i = 0; i < embeddingModel.dimension(); i++) { + float[] queryVector = new float[embeddingModel().dimension()]; + for (int i = 0; i < embeddingModel().dimension(); i++) { queryVector[i]=0.0f; // Zero vector } return new Embedding(queryVector); @@ -73,11 +78,11 @@ public JSONObject listSources() { Embedding queryEmbedding = getZeroVectorEmbedding(); JSONObject jsonObject = new JSONObject(); - jsonObject.put("storeName", storeName); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); JSONArray sources = new JSONArray(); List> embeddingMatches = null; - HashMap sourcesJSONObjectHashMap = new HashMap(); + HashMap sourceObjectMap = new HashMap(); String lowerBoundaryIngestionDateTime = "0000-00-00T00:00:00.000Z"; int lowerBoundaryIndex = -1; @@ -87,8 +92,7 @@ public JSONObject listSources() { LOGGER.debug("Embedding page filter: lowerBoundaryIngestionDateTime: " + lowerBoundaryIngestionDateTime + ", lowerBoundaryIndex: " + lowerBoundaryIndex); - Filter - condition1 = metadataKey(Constants.METADATA_KEY_INGESTION_DATETIME).isGreaterThanOrEqualTo(lowerBoundaryIngestionDateTime); + Filter condition1 = metadataKey(Constants.METADATA_KEY_INGESTION_DATETIME).isGreaterThanOrEqualTo(lowerBoundaryIngestionDateTime); Filter condition2 = metadataKey(Constants.METADATA_KEY_INDEX).isGreaterThan(String.valueOf(lowerBoundaryIndex)); // Index must be handled as a String Filter condition3 = metadataKey(Constants.METADATA_KEY_INGESTION_DATETIME).isGreaterThan(lowerBoundaryIngestionDateTime); @@ -129,7 +133,7 @@ public JSONObject listSources() { } JSONObject sourceObject = new JSONObject(); - sourceObject.put("segmentCount", Integer.parseInt(index) + 1); + sourceObject.put(JSON_KEY_SEGMENT_COUNT, Integer.parseInt(index) + 1); sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId); sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath); sourceObject.put(Constants.METADATA_KEY_FULL_PATH, fullPath); @@ -137,26 +141,8 @@ public JSONObject listSources() { sourceObject.put(Constants.METADATA_KEY_URL, url); sourceObject.put(Constants.METADATA_KEY_INGESTION_DATETIME, ingestionDatetime); - String sourceUniqueKey = getSourceUniqueKey(sourceObject); - - // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key - if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { - - // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) - if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){ - - int currentSegmentCount = Integer.parseInt(index) + 1; - int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount"); - if(currentSegmentCount > storedSegmentCount) { + addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - - } else { - - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } currentPageEmbeddingId = match.embeddingId(); } @@ -168,8 +154,8 @@ public JSONObject listSources() { } while(embeddingMatches.size() == queryParams.embeddingPageSize()); - jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values())); - jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size()); + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); return jsonObject; } @@ -206,6 +192,28 @@ protected String getSourceUniqueKey(JSONObject sourceObject) { return !sourceId.isEmpty() ? sourceId : alternativeKey; } + protected void addOrUpdateSourceObjectIntoSourceObjectMap(HashMap sourceObjectMap, JSONObject sourceObject) { + + String sourceUniqueKey = getSourceUniqueKey(sourceObject); + + // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key + if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { + // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) + if(sourceObjectMap.containsKey(sourceUniqueKey)){ + // Get current index + int currentSegmentCount = sourceObject.getInt(JSON_KEY_SEGMENT_COUNT); + // Get previously stored index + int storedSegmentCount = (int) sourceObjectMap.get(sourceUniqueKey).get(JSON_KEY_SEGMENT_COUNT); + // Check if object need to be updated + if(currentSegmentCount > storedSegmentCount) { + sourceObjectMap.put(sourceUniqueKey, sourceObject); + } + } else { + sourceObjectMap.put(sourceUniqueKey, sourceObject); + } + } + } + protected JSONObject getSourceObject(JSONObject metadataObject) { String sourceId = metadataObject.has(Constants.METADATA_KEY_SOURCE_ID) ? metadataObject.getString(Constants.METADATA_KEY_SOURCE_ID) : null; @@ -218,7 +226,7 @@ protected JSONObject getSourceObject(JSONObject metadataObject) { String ingestionDatetime = metadataObject.has(Constants.METADATA_KEY_INGESTION_DATETIME) ? metadataObject.getString(Constants.METADATA_KEY_INGESTION_DATETIME) : null; JSONObject sourceObject = new JSONObject(); - sourceObject.put("segmentCount", Integer.parseInt(index) + 1); + sourceObject.put(JSON_KEY_SEGMENT_COUNT, Integer.parseInt(index) + 1); sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId); sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath); sourceObject.put(Constants.METADATA_KEY_SOURCE, source); diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/aisearch/AISearchStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java similarity index 73% rename from src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/aisearch/AISearchStore.java rename to src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java index e04c3c6..82ca202 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/aisearch/AISearchStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java @@ -1,4 +1,4 @@ -package org.mule.extension.mulechain.vectors.internal.helper.store.aisearch; +package org.mule.extension.mulechain.vectors.internal.store.aisearch; import org.json.JSONArray; import org.json.JSONObject; @@ -6,7 +6,7 @@ import org.mule.extension.mulechain.vectors.internal.constant.Constants; import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; -import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import java.io.BufferedReader; @@ -36,12 +36,12 @@ public AISearchStore(String storeName, Configuration configuration, QueryParamet public JSONObject listSources() { - HashMap sourcesJSONObjectHashMap = new HashMap(); + HashMap sourceObjectMap = new HashMap(); JSONObject jsonObject = new JSONObject(); - jsonObject.put("storeName", storeName); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); - int segmentCount = 0; // Counter to track the number of documents processed + int segmentCount = 0; // Counter to track the number of segments processed int offset = 0; // Initialize offset for pagination try { @@ -52,7 +52,7 @@ public JSONObject listSources() { do { // Construct the URL with $top and $skip for pagination String urlString = this.url + "/indexes/" + storeName + "/docs?search=*&$top=" + queryParams.embeddingPageSize() + - "&$skip=" + offset + "&$select=id,metadata&api-version=" + API_VERSION; + "&$skip=" + offset + "&$select=id," + Constants.STORE_SCHEMA_METADATA_FIELD_NAME + "&api-version=" + API_VERSION; // Nested loop to handle each page of results while (urlString != null) { @@ -86,7 +86,7 @@ public JSONObject listSources() { JSONObject document = documents.getJSONObject(i); String id = document.getString("id"); // Document ID - JSONObject metadata = document.optJSONObject("metadata"); // Metadata of the document + JSONObject metadata = document.optJSONObject(Constants.STORE_SCHEMA_METADATA_FIELD_NAME); // Metadata of the document if (metadata != null) { @@ -106,24 +106,8 @@ public JSONObject listSources() { } JSONObject sourceObject = getSourceObject(metadataObject); - String sourceUniqueKey = getSourceUniqueKey(sourceObject); - - // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key - if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { - // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) - if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){ - // Get current index - int currentSegmentCount = index + 1; - // Get previously stored index - int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount"); - // Check if object need to be updated - if(currentSegmentCount > storedSegmentCount) { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } else { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } + + addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); LOGGER.debug("sourceObject: " + sourceObject); segmentCount++; // Increment document count @@ -153,7 +137,7 @@ public JSONObject listSources() { } while (hasMore); // Continue if more pages are available // Output total count of processed documents - LOGGER.debug("segmentCount: " + segmentCount); + LOGGER.debug(JSON_KEY_SEGMENT_COUNT + ": " + segmentCount); } catch (Exception e) { @@ -161,8 +145,8 @@ public JSONObject listSources() { LOGGER.error("Error while listing sources", e); } - jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values())); - jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size()); + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); return jsonObject; } diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java similarity index 65% rename from src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusStore.java rename to src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java index 3b40428..90d421c 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java @@ -1,4 +1,4 @@ -package org.mule.extension.mulechain.vectors.internal.helper.store.milvus; +package org.mule.extension.mulechain.vectors.internal.store.milvus; import com.google.gson.JsonObject; import io.milvus.client.MilvusServiceClient; @@ -7,13 +7,12 @@ import io.milvus.param.dml.QueryIteratorParam; import io.milvus.param.R; import io.milvus.response.QueryResultsWrapper; -import org.json.JSONArray; import org.json.JSONObject; import org.mule.extension.mulechain.vectors.internal.config.Configuration; import org.mule.extension.mulechain.vectors.internal.constant.Constants; import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; -import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import java.util.ArrayList; @@ -38,10 +37,10 @@ public MilvusStore(String storeName, Configuration configuration, QueryParameter public JSONObject listSources() { - HashMap sourcesJSONObjectHashMap = new HashMap(); + HashMap sourceObjectMap = new HashMap(); JSONObject jsonObject = new JSONObject(); - jsonObject.put("storeName", storeName); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); // Specify the host and port for the Milvus server ConnectParam connectParam = ConnectParam.newBuilder() @@ -58,7 +57,7 @@ public JSONObject listSources() { QueryIteratorParam iteratorParam = QueryIteratorParam.newBuilder() .withCollectionName(storeName) .withBatchSize((long)queryParams.embeddingPageSize()) - .withOutFields(Arrays.asList("metadata")) + .withOutFields(Arrays.asList(Constants.STORE_SCHEMA_METADATA_FIELD_NAME)) .build(); R queryIteratorRes = client.queryIterator(iteratorParam); @@ -82,30 +81,13 @@ public JSONObject listSources() { for (QueryResultsWrapper.RowRecord rowRecord : batchResults) { - JsonObject gsonObject = (JsonObject)rowRecord.getFieldValues().get("metadata"); + JsonObject gsonObject = (JsonObject)rowRecord.getFieldValues().get(Constants.STORE_SCHEMA_METADATA_FIELD_NAME); JSONObject metadataObject = new JSONObject(gsonObject.toString()); String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null; JSONObject sourceObject = getSourceObject(metadataObject); - String sourceUniqueKey = getSourceUniqueKey(sourceObject); - - // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key - if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { - // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) - if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){ - // Get current index - int currentSegmentCount = Integer.parseInt(index) + 1; - // Get previously stored index - int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount"); - // Check if object need to be updated - if(currentSegmentCount > storedSegmentCount) { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } else { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } + addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); } } @@ -114,8 +96,8 @@ public JSONObject listSources() { client.close(); } - jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values())); - jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size()); + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); return jsonObject; } diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/pgvector/PGVectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java similarity index 80% rename from src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/pgvector/PGVectorStore.java rename to src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java index 8b47120..c34d9fa 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/pgvector/PGVectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java @@ -1,11 +1,11 @@ -package org.mule.extension.mulechain.vectors.internal.helper.store.pgvector; +package org.mule.extension.mulechain.vectors.internal.store.pgvector; import org.json.JSONObject; import org.mule.extension.mulechain.vectors.internal.config.Configuration; import org.mule.extension.mulechain.vectors.internal.constant.Constants; import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; -import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import org.postgresql.ds.PGSimpleDataSource; import org.slf4j.Logger; @@ -63,10 +63,10 @@ public PGVectorStore(String storeName, Configuration configuration, QueryParamet */ public JSONObject listSources() { - HashMap sourcesJSONObjectHashMap = new HashMap<>(); + HashMap sourceObjectMap = new HashMap<>(); JSONObject jsonObject = new JSONObject(); - jsonObject.put("storeName", storeName); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); try (PgVectorMetadataIterator iterator = new PgVectorMetadataIterator(userName, password, host, port, database, storeName, (int)queryParams.embeddingPageSize())) { while (iterator.hasNext()) { @@ -76,31 +76,14 @@ public JSONObject listSources() { String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null; JSONObject sourceObject = getSourceObject(metadataObject); - String sourceUniqueKey = getSourceUniqueKey(sourceObject); - - // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key - if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) { - // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments) - if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){ - // Get current index - int currentSegmentCount = Integer.parseInt(index) + 1; - // Get previously stored index - int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount"); - // Check if object need to be updated - if(currentSegmentCount > storedSegmentCount) { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } else { - sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject); - } - } + addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); } } catch (SQLException e) { LOGGER.error("Error while listing sources", e); } - jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values())); - jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size()); + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); return jsonObject; } @@ -157,7 +140,7 @@ private void fetchNextPage() throws SQLException { pstmt.close(); } - String query = "SELECT metadata FROM " + table + " LIMIT ? OFFSET ?"; + String query = "SELECT " + Constants.STORE_SCHEMA_METADATA_FIELD_NAME + " FROM " + table + " LIMIT ? OFFSET ?"; pstmt = connection.prepareStatement(query); pstmt.setInt(1, this.pageSize); pstmt.setInt(2, offset); @@ -198,7 +181,7 @@ public String next() { if (resultSet == null) { throw new NoSuchElementException("No more elements available"); } - return resultSet.getString("metadata"); + return resultSet.getString(Constants.STORE_SCHEMA_METADATA_FIELD_NAME); } catch (SQLException e) { LOGGER.error("Error retrieving next element", e); throw new NoSuchElementException("Error retrieving next element"); From 50a1c80e38113a858930e5a36702986897fbb985 Mon Sep 17 00:00:00 2001 From: Tommaso Bolis Date: Tue, 12 Nov 2024 17:26:45 +0100 Subject: [PATCH 2/6] Drafted list sources operation for weviate. Block other vector store from using it until fully tested. --- .../internal/store/weviate/WeviateStore.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeviateStore.java diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeviateStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeviateStore.java new file mode 100644 index 0000000..1fcea65 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeviateStore.java @@ -0,0 +1,50 @@ +package org.mule.extension.mulechain.vectors.internal.store.weviate; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +import java.util.HashMap; + +import static org.mule.extension.mulechain.vectors.internal.util.JsonUtils.readConfigFile; + +public class WeviateStore extends VectorStore { + + private String host; + private String protocol; + private String apiKey; + + public WeviateStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_WEAVIATE); + this.host = vectorStoreConfig.getString("WEAVIATE_HOST"); + this.protocol = vectorStoreConfig.getString("WEAVIATE_PROTOCOL"); + this.apiKey = vectorStoreConfig.getString("WEAVIATE_APIKEY"); + } + + public String getIndex() { + + return storeName.substring(0, 1).toUpperCase() + storeName.substring(1); + } + + public JSONObject listSources() { + + HashMap sourceObjectMap = new HashMap(); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); + + + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); + + return jsonObject; + } +} From c64512c0ce31fa8167bc7ccc586e91afeda804f4 Mon Sep 17 00:00:00 2001 From: Tommaso Bolis Date: Tue, 12 Nov 2024 17:38:43 +0100 Subject: [PATCH 3/6] Add javadoc --- .../vectors/internal/store/VectorStore.java | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java index b8b0009..d3b2db7 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java @@ -28,6 +28,11 @@ import static dev.langchain4j.store.embedding.filter.MetadataFilterBuilder.metadataKey; +/** + * The {@code VectorStore} class provides a framework for interacting with various types of vector stores, + * enabling storage and retrieval of vector embeddings for data analysis and retrieval purposes. It serves as + * an abstract base for specific implementations such as Milvus, PGVector, and AI Search stores. + */ public class VectorStore { protected static final Logger LOGGER = LoggerFactory.getLogger(VectorStore.class); @@ -43,6 +48,14 @@ public class VectorStore { protected EmbeddingModelNameParameters modelParams; protected EmbeddingModel embeddingModel; + /** + * Constructs a new {@code VectorStore} instance with specified configurations. + * + * @param storeName the name of the vector store + * @param configuration the configuration object containing settings for the vector store + * @param queryParams parameters for querying the vector store + * @param modelParams parameters for selecting and configuring the embedding model + */ public VectorStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { this.storeName = storeName; @@ -59,6 +72,11 @@ public EmbeddingModel embeddingModel() { return this.embeddingModel; } + /** + * Retrieves the embedding model used by this vector store. Initializes the model if it is not already set. + * + * @return the embedding model used by the vector store + */ public Embedding getZeroVectorEmbedding() { // Create a general query vector (e.g., zero vector). Zero vector is often used when you need to retrieve all @@ -70,6 +88,11 @@ public Embedding getZeroVectorEmbedding() { return new Embedding(queryVector); } + /** + * Retrieves a JSON object listing sources available in the vector store, including metadata for each source. + * + * @return a JSON object containing a list of sources and their metadata + */ public JSONObject listSources() { dev.langchain4j.store.embedding.EmbeddingStore @@ -192,6 +215,12 @@ protected String getSourceUniqueKey(JSONObject sourceObject) { return !sourceId.isEmpty() ? sourceId : alternativeKey; } + /** + * Adds or updates a source object into the source object map. + * + * @param sourceObjectMap The map of source objects keyed by their unique keys. + * @param sourceObject The source object to add or update. + */ protected void addOrUpdateSourceObjectIntoSourceObjectMap(HashMap sourceObjectMap, JSONObject sourceObject) { String sourceUniqueKey = getSourceUniqueKey(sourceObject); @@ -214,6 +243,16 @@ protected void addOrUpdateSourceObjectIntoSourceObjectMap(HashMap + * The generated source object includes keys such as source ID, file name, URL, full path, and ingestion + * datetime, among others. + *

+ * + * @param metadataObject a {@code JSONObject} containing metadata fields. + * @return a {@code JSONObject} with organized metadata for a source. + */ protected JSONObject getSourceObject(JSONObject metadataObject) { String sourceId = metadataObject.has(Constants.METADATA_KEY_SOURCE_ID) ? metadataObject.getString(Constants.METADATA_KEY_SOURCE_ID) : null; @@ -238,11 +277,28 @@ protected JSONObject getSourceObject(JSONObject metadataObject) { return sourceObject; } + /** + * Provides a {@link Builder} instance for configuring and creating {@code VectorStore} objects. + *

+ * The builder pattern allows for more flexible and readable configuration of a {@code VectorStore}. + * Use this to set parameters such as the store name, configuration, query parameters, and embedding model. + *

+ * + * @return a new {@code Builder} instance. + */ public static Builder builder() { return new Builder(); } + /** + * Builder class for creating instances of {@link VectorStore}. + *

+ * The {@code Builder} class allows you to set various configuration parameters before + * creating a {@code VectorStore} instance. These parameters include the store name, + * configuration settings, query parameters, and embedding model details. + *

+ */ public static class Builder { private String storeName; @@ -255,31 +311,72 @@ public Builder() { } + /** + * Sets the store name for the {@code VectorStore}. + * + * @param storeName the name of the vector store. + * @return the {@code Builder} instance, for method chaining. + */ public Builder storeName(String storeName) { this.storeName = storeName; return this; } + /** + * Sets the configuration for the {@code VectorStore}. + * + * @param configuration the configuration parameters. + * @return the {@code Builder} instance, for method chaining. + */ public Builder configuration(Configuration configuration) { this.configuration = configuration; return this; } + /** + * Sets the query parameters for embedding searches. + * + * @param queryParams the query parameters to use. + * @return the {@code Builder} instance, for method chaining. + */ public Builder queryParams(QueryParameters queryParams) { this.queryParams = queryParams; return this; } + /** + * Sets the parameters for the embedding model. + * + * @param modelParams parameters for selecting and configuring the embedding model. + * @return the {@code Builder} instance, for method chaining. + */ public Builder modelParams(EmbeddingModelNameParameters modelParams) { this.modelParams = modelParams; return this; } + /** + * Sets a pre-configured embedding model for the {@code VectorStore}. + * + * @param embeddingModel the embedding model to use. + * @return the {@code Builder} instance, for method chaining. + */ public Builder embeddingModel(EmbeddingModel embeddingModel) { this.embeddingModel = embeddingModel; return this; } + /** + * Builds and returns a new {@link VectorStore} instance based on the builder's configuration. + *

+ * Depending on the specified configuration, it returns an instance of the appropriate + * store class (e.g., {@link MilvusStore}, {@link PGVectorStore}, or {@link AISearchStore}). + * If no matching store configuration is found, it returns a default {@code VectorStore} instance. + *

+ * + * @return a {@code VectorStore} instance. + * @throws IllegalArgumentException if the configured vector store is unsupported. + */ public VectorStore build() { VectorStore embeddingStore; From 6f485aef54b2dc943f7ea5daf98a1c010c17f691 Mon Sep 17 00:00:00 2001 From: Tommaso Bolis Date: Tue, 12 Nov 2024 18:11:11 +0100 Subject: [PATCH 4/6] Drafted list sources operation for weviate. Block other vector store from using it until fully tested. --- .../store/aisearch/AISearchStore.java | 7 +- .../internal/store/milvus/MilvusStore.java | 4 - .../store/pgvector/PGVectorStore.java | 3 - .../internal/store/weviate/WeaviateStore.java | 118 ++++++++++++++++++ .../internal/store/weviate/WeviateStore.java | 50 -------- 5 files changed, 119 insertions(+), 63 deletions(-) create mode 100644 src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java delete mode 100644 src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeviateStore.java diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java index 82ca202..2b1cf35 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/aisearch/AISearchStore.java @@ -94,19 +94,14 @@ public JSONObject listSources() { JSONArray attributes = metadata.optJSONArray("attributes"); JSONObject metadataObject = new JSONObject(); // Object to store key-value pairs from attributes - - int index =0; // Iterate over attributes array to populate sourceObject for (int j = 0; j < attributes.length(); j++) { + JSONObject attribute = attributes.getJSONObject(j); metadataObject.put(attribute.getString("key"), attribute.get("value")); - if(attribute.getString("key").compareTo(Constants.METADATA_KEY_INDEX) == 0) { - index = Integer.parseInt(metadataObject.getString("index")); - } } JSONObject sourceObject = getSourceObject(metadataObject); - addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); LOGGER.debug("sourceObject: " + sourceObject); diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java index 90d421c..c6682bb 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/milvus/MilvusStore.java @@ -83,12 +83,8 @@ public JSONObject listSources() { JsonObject gsonObject = (JsonObject)rowRecord.getFieldValues().get(Constants.STORE_SCHEMA_METADATA_FIELD_NAME); JSONObject metadataObject = new JSONObject(gsonObject.toString()); - - String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null; JSONObject sourceObject = getSourceObject(metadataObject); - addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); - } } } diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java index c34d9fa..31c9b6b 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java @@ -72,10 +72,7 @@ public JSONObject listSources() { while (iterator.hasNext()) { JSONObject metadataObject = new JSONObject(iterator.next()); - - String index = metadataObject.has(Constants.METADATA_KEY_INDEX) ? metadataObject.getString(Constants.METADATA_KEY_INDEX) : null; JSONObject sourceObject = getSourceObject(metadataObject); - addOrUpdateSourceObjectIntoSourceObjectMap(sourceObjectMap, sourceObject); } } catch (SQLException e) { diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java new file mode 100644 index 0000000..e7288d2 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java @@ -0,0 +1,118 @@ +package org.mule.extension.mulechain.vectors.internal.store.weviate; + +import io.weaviate.client.WeaviateAuthClient; +import io.weaviate.client.WeaviateClient; +import io.weaviate.client.Config; +import io.weaviate.client.v1.auth.exception.AuthException; +import io.weaviate.client.v1.graphql.model.GraphQLResponse; +import io.weaviate.client.v1.graphql.query.Get; +import io.weaviate.client.v1.graphql.query.fields.Field; +import io.weaviate.client.base.Result; +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +import java.util.*; +import java.util.stream.Stream; + +import static org.mule.extension.mulechain.vectors.internal.util.JsonUtils.readConfigFile; + +public class WeaviateStore extends VectorStore { + + private String host; + private String protocol; + private String apiKey; + + public WeaviateStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_WEAVIATE); + this.host = vectorStoreConfig.getString("WEAVIATE_HOST"); + this.protocol = vectorStoreConfig.getString("WEAVIATE_PROTOCOL"); + this.apiKey = vectorStoreConfig.getString("WEAVIATE_APIKEY"); + } + + public String getIndex() { + + return storeName.substring(0, 1).toUpperCase() + storeName.substring(1); + } + + public JSONObject listSources() { + + HashMap sourceObjectMap = new HashMap(); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); + + WeaviateClient weaviateClient; + try { + weaviateClient = WeaviateAuthClient.apiKey(new Config(protocol, host), apiKey); + } catch (AuthException e) { + // handle error in case of authorization problems + throw new RuntimeException(e); + } + + String[] properties = new String[]{"metadata"}; + String cursor = ""; + + getBatchWithCursor(weaviateClient, storeName, properties, queryParams.embeddingPageSize(), cursor); + + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); + + return jsonObject; + } + + private Result getBatchWithCursor(WeaviateClient client, + String className, String[] properties, int batchSize, String cursor) { + Get query = client.graphQL().get() + .withClassName(className) + // Optionally retrieve the vector embedding by adding `vector` to the _additional fields + .withFields(Stream.concat(Arrays.stream(properties), Stream.of("_additional { id vector }")) + .map(prop -> Field.builder().name(prop).build()) + .toArray(Field[]::new) + ) + .withLimit(batchSize); + + if (cursor != null) { + return query.withAfter(cursor).run(); + } + return query.run(); + } + + private List> getProperties(GraphQLResponse result, String className, String[] classProperties) { + + Object get = ((Map) result.getData()).get("Get"); + Object clazz = ((Map) get).get(className); + List objects = (List) clazz; + List> res = new ArrayList<>(); + for (Object obj : objects) { + Map objProps = new HashMap<>(); + for (String prop: classProperties) { + Object propValue = ((Map) obj).get(prop); + objProps.put(prop, propValue); + } + Object additional = ((Map) obj).get("_additional"); + Object id = ((Map) additional).get("id"); + objProps.put("id", id); + Object vector = ((Map) additional).get("vector"); + objProps.put("vector", vector); + res.add(objProps); + } + return res; + } + + private int getObjectsCount(GraphQLResponse result, String className) { + + Object get = ((Map) result.getData()).get("Get"); + Object clazz = ((Map) get).get(className); + List objects = (List) clazz; + return objects.size(); + } +} diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeviateStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeviateStore.java deleted file mode 100644 index 1fcea65..0000000 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeviateStore.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.mule.extension.mulechain.vectors.internal.store.weviate; - -import org.json.JSONObject; -import org.mule.extension.mulechain.vectors.internal.config.Configuration; -import org.mule.extension.mulechain.vectors.internal.constant.Constants; -import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; -import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; -import org.mule.extension.mulechain.vectors.internal.store.VectorStore; -import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; - -import java.util.HashMap; - -import static org.mule.extension.mulechain.vectors.internal.util.JsonUtils.readConfigFile; - -public class WeviateStore extends VectorStore { - - private String host; - private String protocol; - private String apiKey; - - public WeviateStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { - - super(storeName, configuration, queryParams, modelParams); - - JSONObject config = readConfigFile(configuration.getConfigFilePath()); - JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_WEAVIATE); - this.host = vectorStoreConfig.getString("WEAVIATE_HOST"); - this.protocol = vectorStoreConfig.getString("WEAVIATE_PROTOCOL"); - this.apiKey = vectorStoreConfig.getString("WEAVIATE_APIKEY"); - } - - public String getIndex() { - - return storeName.substring(0, 1).toUpperCase() + storeName.substring(1); - } - - public JSONObject listSources() { - - HashMap sourceObjectMap = new HashMap(); - - JSONObject jsonObject = new JSONObject(); - jsonObject.put(JSON_KEY_STORE_NAME, storeName); - - - jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); - jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); - - return jsonObject; - } -} From f469108d498a987836f2926f2983c4df8792fc3c Mon Sep 17 00:00:00 2001 From: Tommaso Bolis Date: Tue, 12 Nov 2024 21:57:14 +0100 Subject: [PATCH 5/6] Add custom store classes for each store type --- .../helper/factory/EmbeddingStoreFactory.java | 9 -- .../operation/EmbeddingOperations.java | 4 +- .../vectors/internal/store/VectorStore.java | 23 ++++- .../internal/store/chroma/ChromaStore.java | 24 +++++ .../elasticsearch/ElasticsearchStore.java | 27 ++++++ .../store/opensearch/OpenSearchStore.java | 27 ++++++ .../store/pinecone/PineconeStore.java | 27 ++++++ .../internal/store/weviate/WeaviateStore.java | 90 ------------------- 8 files changed, 128 insertions(+), 103 deletions(-) create mode 100644 src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java create mode 100644 src/main/java/org/mule/extension/mulechain/vectors/internal/store/elasticsearch/ElasticsearchStore.java create mode 100644 src/main/java/org/mule/extension/mulechain/vectors/internal/store/opensearch/OpenSearchStore.java create mode 100644 src/main/java/org/mule/extension/mulechain/vectors/internal/store/pinecone/PineconeStore.java diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/factory/EmbeddingStoreFactory.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/factory/EmbeddingStoreFactory.java index 3a8cc99..45eb958 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/factory/EmbeddingStoreFactory.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/factory/EmbeddingStoreFactory.java @@ -183,13 +183,4 @@ private static EmbeddingStore createWeaviateStore(String protocol, .apiKey(apiKey) .build(); } - - /*private static EmbeddingStore createNeo4JStore(String boltURL, String userName, String password, String collectionName, Integer dimension) { - return Neo4jEmbeddingStore.builder() - .withBasicAuth(boltURL, userName, password) - .dimension(dimension) - .databaseName(collectionName) - .build(); - }*/ - } diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java index 4d24b5c..e91587e 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java @@ -354,9 +354,9 @@ public InputStream listSourcesFromStore(String storeName, ) { EmbeddingOperationValidator.validateOperationType( - Constants.EMBEDDING_OPERATION_TYPE_FILTER_BY_METADATA,configuration.getVectorStore()); + Constants.EMBEDDING_OPERATION_TYPE_QUERY_ALL,configuration.getVectorStore()); EmbeddingOperationValidator.validateOperationType( - Constants.EMBEDDING_OPERATION_TYPE_QUERY_ALL,configuration.getVectorStore()); + Constants.EMBEDDING_OPERATION_TYPE_FILTER_BY_METADATA,configuration.getVectorStore()); VectorStore vectorStore = VectorStore.builder() .storeName(storeName) diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java index d3b2db7..e8aeaae 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/VectorStore.java @@ -17,8 +17,13 @@ import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; import org.mule.extension.mulechain.vectors.internal.store.aisearch.AISearchStore; +import org.mule.extension.mulechain.vectors.internal.store.chroma.ChromaStore; +import org.mule.extension.mulechain.vectors.internal.store.elasticsearch.ElasticsearchStore; import org.mule.extension.mulechain.vectors.internal.store.milvus.MilvusStore; +import org.mule.extension.mulechain.vectors.internal.store.opensearch.OpenSearchStore; import org.mule.extension.mulechain.vectors.internal.store.pgvector.PGVectorStore; +import org.mule.extension.mulechain.vectors.internal.store.pinecone.PineconeStore; +import org.mule.extension.mulechain.vectors.internal.store.weviate.WeaviateStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -398,15 +403,29 @@ public VectorStore build() { embeddingStore = new AISearchStore(storeName, configuration, queryParams, modelParams); break; + case Constants.VECTOR_STORE_WEAVIATE: + + embeddingStore = new WeaviateStore(storeName, configuration, queryParams, modelParams); + break; + case Constants.VECTOR_STORE_CHROMA: + embeddingStore = new ChromaStore(storeName, configuration, queryParams, modelParams); + break; + case Constants.VECTOR_STORE_PINECONE: + embeddingStore = new PineconeStore(storeName, configuration, queryParams, modelParams); + break; + case Constants.VECTOR_STORE_ELASTICSEARCH: - case Constants.VECTOR_STORE_WEAVIATE: + embeddingStore = new ElasticsearchStore(storeName, configuration, queryParams, modelParams); + break; + + case Constants.VECTOR_STORE_OPENSEARCH: - embeddingStore = new VectorStore(storeName, configuration, queryParams, modelParams); + embeddingStore = new OpenSearchStore(storeName, configuration, queryParams, modelParams); break; default: diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java new file mode 100644 index 0000000..25b4275 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java @@ -0,0 +1,24 @@ +package org.mule.extension.mulechain.vectors.internal.store.chroma; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +public class ChromaStore extends VectorStore { + + private String url; + + public ChromaStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = JsonUtils.readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_ELASTICSEARCH); + this.url = vectorStoreConfig.getString("CHROMA_URL"); + } + +} diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/elasticsearch/ElasticsearchStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/elasticsearch/ElasticsearchStore.java new file mode 100644 index 0000000..d5a47b9 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/elasticsearch/ElasticsearchStore.java @@ -0,0 +1,27 @@ +package org.mule.extension.mulechain.vectors.internal.store.elasticsearch; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +public class ElasticsearchStore extends VectorStore { + + private String url; + private String userName; + private String password; + + public ElasticsearchStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = JsonUtils.readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_ELASTICSEARCH); + this.url = vectorStoreConfig.getString("ELASTICSEARCH_URL"); + this.userName = vectorStoreConfig.getString("ELASTICSEARCH_USER"); + this.password = vectorStoreConfig.getString("ELASTICSEARCH_PASSWORD"); + } +} diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/opensearch/OpenSearchStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/opensearch/OpenSearchStore.java new file mode 100644 index 0000000..6753f15 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/opensearch/OpenSearchStore.java @@ -0,0 +1,27 @@ +package org.mule.extension.mulechain.vectors.internal.store.opensearch; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +public class OpenSearchStore extends VectorStore { + + private String url; + private String userName; + private String password; + + public OpenSearchStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = JsonUtils.readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_OPENSEARCH); + this.url = vectorStoreConfig.getString("OPENSEARCH_URL"); + this.userName = vectorStoreConfig.getString("OPENSEARCH_USER"); + this.password = vectorStoreConfig.getString("OPENSEARCH_PASSWORD"); + } +} diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pinecone/PineconeStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pinecone/PineconeStore.java new file mode 100644 index 0000000..2712507 --- /dev/null +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pinecone/PineconeStore.java @@ -0,0 +1,27 @@ +package org.mule.extension.mulechain.vectors.internal.store.pinecone; + +import org.json.JSONObject; +import org.mule.extension.mulechain.vectors.internal.config.Configuration; +import org.mule.extension.mulechain.vectors.internal.constant.Constants; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; +import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; +import org.mule.extension.mulechain.vectors.internal.store.VectorStore; +import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; + +public class PineconeStore extends VectorStore { + + private String apiKey; + private String cloud; + private String cloudRegion; + + public PineconeStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { + + super(storeName, configuration, queryParams, modelParams); + + JSONObject config = JsonUtils.readConfigFile(configuration.getConfigFilePath()); + JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_PINECONE); + this.apiKey = vectorStoreConfig.getString("PINECONE_APIKEY"); + this.cloud = vectorStoreConfig.getString("PINECONE_SERVERLESS_CLOUD"); + this.cloudRegion = vectorStoreConfig.getString("PINECONE_SERVERLESS_REGION"); + } +} diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java index e7288d2..cc9ee8c 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/weviate/WeaviateStore.java @@ -1,23 +1,11 @@ package org.mule.extension.mulechain.vectors.internal.store.weviate; -import io.weaviate.client.WeaviateAuthClient; -import io.weaviate.client.WeaviateClient; -import io.weaviate.client.Config; -import io.weaviate.client.v1.auth.exception.AuthException; -import io.weaviate.client.v1.graphql.model.GraphQLResponse; -import io.weaviate.client.v1.graphql.query.Get; -import io.weaviate.client.v1.graphql.query.fields.Field; -import io.weaviate.client.base.Result; import org.json.JSONObject; import org.mule.extension.mulechain.vectors.internal.config.Configuration; import org.mule.extension.mulechain.vectors.internal.constant.Constants; import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters; import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters; import org.mule.extension.mulechain.vectors.internal.store.VectorStore; -import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; - -import java.util.*; -import java.util.stream.Stream; import static org.mule.extension.mulechain.vectors.internal.util.JsonUtils.readConfigFile; @@ -37,82 +25,4 @@ public WeaviateStore(String storeName, Configuration configuration, QueryParamet this.protocol = vectorStoreConfig.getString("WEAVIATE_PROTOCOL"); this.apiKey = vectorStoreConfig.getString("WEAVIATE_APIKEY"); } - - public String getIndex() { - - return storeName.substring(0, 1).toUpperCase() + storeName.substring(1); - } - - public JSONObject listSources() { - - HashMap sourceObjectMap = new HashMap(); - - JSONObject jsonObject = new JSONObject(); - jsonObject.put(JSON_KEY_STORE_NAME, storeName); - - WeaviateClient weaviateClient; - try { - weaviateClient = WeaviateAuthClient.apiKey(new Config(protocol, host), apiKey); - } catch (AuthException e) { - // handle error in case of authorization problems - throw new RuntimeException(e); - } - - String[] properties = new String[]{"metadata"}; - String cursor = ""; - - getBatchWithCursor(weaviateClient, storeName, properties, queryParams.embeddingPageSize(), cursor); - - jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); - jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); - - return jsonObject; - } - - private Result getBatchWithCursor(WeaviateClient client, - String className, String[] properties, int batchSize, String cursor) { - Get query = client.graphQL().get() - .withClassName(className) - // Optionally retrieve the vector embedding by adding `vector` to the _additional fields - .withFields(Stream.concat(Arrays.stream(properties), Stream.of("_additional { id vector }")) - .map(prop -> Field.builder().name(prop).build()) - .toArray(Field[]::new) - ) - .withLimit(batchSize); - - if (cursor != null) { - return query.withAfter(cursor).run(); - } - return query.run(); - } - - private List> getProperties(GraphQLResponse result, String className, String[] classProperties) { - - Object get = ((Map) result.getData()).get("Get"); - Object clazz = ((Map) get).get(className); - List objects = (List) clazz; - List> res = new ArrayList<>(); - for (Object obj : objects) { - Map objProps = new HashMap<>(); - for (String prop: classProperties) { - Object propValue = ((Map) obj).get(prop); - objProps.put(prop, propValue); - } - Object additional = ((Map) obj).get("_additional"); - Object id = ((Map) additional).get("id"); - objProps.put("id", id); - Object vector = ((Map) additional).get("vector"); - objProps.put("vector", vector); - res.add(objProps); - } - return res; - } - - private int getObjectsCount(GraphQLResponse result, String className) { - - Object get = ((Map) result.getData()).get("Get"); - Object clazz = ((Map) get).get(className); - List objects = (List) clazz; - return objects.size(); - } } From 713db0f8ad1ec5df7ffede9770e9641b7fd451b9 Mon Sep 17 00:00:00 2001 From: Tommaso Bolis Date: Tue, 12 Nov 2024 23:20:29 +0100 Subject: [PATCH 6/6] Implementing list sources operation for chroma directly through apis. Block other vector store from using it until fully tested. --- .../internal/store/chroma/ChromaStore.java | 153 ++++++++++++++++++ .../store/pgvector/PGVectorStore.java | 4 +- 2 files changed, 155 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java index 25b4275..56d52f6 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/chroma/ChromaStore.java @@ -8,10 +8,29 @@ import org.mule.extension.mulechain.vectors.internal.store.VectorStore; import org.mule.extension.mulechain.vectors.internal.util.JsonUtils; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; + +/** + * ChromaStore is a specialized implementation of {@link VectorStore} designed to interact with + * the Chroma database for managing vector data and sources. + */ public class ChromaStore extends VectorStore { private String url; + + /** + * Initializes a new instance of ChromaStore. + * + * @param storeName the name of the vector store. + * @param configuration the configuration object containing necessary settings. + * @param queryParams parameters related to query configurations. + * @param modelParams parameters for embedding model configurations. + */ public ChromaStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) { super(storeName, configuration, queryParams, modelParams); @@ -21,4 +40,138 @@ public ChromaStore(String storeName, Configuration configuration, QueryParameter this.url = vectorStoreConfig.getString("CHROMA_URL"); } + /** + * Retrieves a JSON object listing all sources associated with the store. + * + * @return a {@link JSONObject} containing details of all sources. + */ + public JSONObject listSources() { + + HashMap sourceObjectMap = new HashMap(); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put(JSON_KEY_STORE_NAME, storeName); + + long segmentCount = 0; // Counter to track the number of segments processed + long offset = 0; // Initialize offset for pagination + + try { + + String collectionId = getCollectionId(storeName); + segmentCount = getSegmentCount(collectionId); + + while(offset < segmentCount) { + + } + + } catch (Exception e) { + + // Handle any exceptions that occur during the process + LOGGER.error("Error while listing sources", e); + } + + jsonObject.put(JSON_KEY_SOURCES, JsonUtils.jsonObjectCollectionToJsonArray(sourceObjectMap.values())); + jsonObject.put(JSON_KEY_SOURCE_COUNT, sourceObjectMap.size()); + + return jsonObject; + } + + /** + * Retrieves the total number of segments in the specified collection. + * + * @param collectionId the ID of the collection. + * @return the segment count as a {@code long}. + */ + private long getSegmentCount(String collectionId) { + + long segmentCount = 0; + try { + + String urlString = url + "/api/v1/collections/" + collectionId + "/count"; + URL url = new URL(urlString); + + // Open connection and configure HTTP request + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Content-Type", "application/json"); + + // Check the response code and handle accordingly + if (connection.getResponseCode() == 200) { + + BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream())); + StringBuilder responseBuilder = new StringBuilder(); + String line; + + // Read response line by line + while ((line = in.readLine()) != null) { + responseBuilder.append(line); + } + in.close(); + segmentCount = Long.parseLong(responseBuilder.toString()); + + } else { + + // Log any error responses from the server + LOGGER.error("Error: " + connection.getResponseCode() + " " + connection.getResponseMessage()); + } + + } catch (Exception e) { + + // Handle any exceptions that occur during the process + LOGGER.error("Error getting collection count", e); + } + LOGGER.debug("segmentCount: " + segmentCount); + return segmentCount; + } + + /** + * Retrieves the collection ID for a given store name. + * + * @param storeName the name of the store. + * @return the collection ID as a {@code String}. + */ + private String getCollectionId(String storeName) { + + String collectionId = ""; + try { + + String urlString = url + "/api/v1/collections/" + storeName; + URL url = new URL(urlString); + + // Open connection and configure HTTP request + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Content-Type", "application/json"); + + // Check the response code and handle accordingly + if (connection.getResponseCode() == 200) { + + BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream())); + StringBuilder responseBuilder = new StringBuilder(); + String line; + + // Read response line by line + while ((line = in.readLine()) != null) { + responseBuilder.append(line); + } + in.close(); + + // Parse JSON response + JSONObject jsonResponse = new JSONObject(responseBuilder.toString()); + collectionId = jsonResponse.getString("id"); + + } else { + + // Log any error responses from the server + LOGGER.error("Error: " + connection.getResponseCode() + " " + connection.getResponseMessage()); + } + + } catch (Exception e) { + + // Handle any exceptions that occur during the process + LOGGER.error("Error getting collection id", e); + } + LOGGER.debug("collectionId: " + collectionId); + return collectionId; + } } diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java index 31c9b6b..7bcf9f9 100644 --- a/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java +++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/store/pgvector/PGVectorStore.java @@ -88,7 +88,7 @@ public JSONObject listSources() { /** * Iterator to handle metadata pagination from the PostgreSQL database. */ - public class PgVectorMetadataIterator implements Iterator, AutoCloseable { + private class PgVectorMetadataIterator implements Iterator, AutoCloseable { private int offset = 0; // Current offset for pagination private ResultSet resultSet; @@ -109,7 +109,7 @@ public class PgVectorMetadataIterator implements Iterator, AutoCloseable * @param pageSize The number of rows per page for pagination. * @throws SQLException If a database error occurs. */ - public PgVectorMetadataIterator(String userName, String password, String host, int port, String database, String table, int pageSize) throws SQLException { + private PgVectorMetadataIterator(String userName, String password, String host, int port, String database, String table, int pageSize) throws SQLException { // Initialize the connection and the first page of data PGSimpleDataSource dataSource = new PGSimpleDataSource();