match : embeddingMatches) {
+
+ Metadata matchMetadata = match.embedded().metadata();
+ String sourceId = matchMetadata.getString(Constants.METADATA_KEY_SOURCE_ID);
+ String index = matchMetadata.getString(Constants.METADATA_KEY_INDEX);
+ String fileName = matchMetadata.getString(Constants.METADATA_KEY_FILE_NAME);
+ String url = matchMetadata.getString(Constants.METADATA_KEY_URL);
+ String fullPath = matchMetadata.getString(Constants.METADATA_KEY_FULL_PATH);
+ String absoluteDirectoryPath = matchMetadata.getString(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH);
+ String ingestionDatetime = matchMetadata.getString(Constants.METADATA_KEY_INGESTION_DATETIME);
+
+ if(lowerBoundaryIngestionDateTime.compareTo(ingestionDatetime) < 0) {
+
+ lowerBoundaryIngestionDateTime = ingestionDatetime;
+ lowerBoundaryIndex = Integer.parseInt(index);
+ } else if(lowerBoundaryIngestionDateTime.compareTo(ingestionDatetime) == 0) {
+
+ if(Integer.parseInt(index) > lowerBoundaryIndex) {
+ lowerBoundaryIndex = Integer.parseInt(index);
+ }
+ }
+
+ JSONObject sourceObject = new JSONObject();
+ sourceObject.put("segmentCount", Integer.parseInt(index) + 1);
+ sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId);
+ sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath);
+ sourceObject.put(Constants.METADATA_KEY_FULL_PATH, fullPath);
+ sourceObject.put(Constants.METADATA_KEY_FILE_NAME, fileName);
+ sourceObject.put(Constants.METADATA_KEY_URL, url);
+ sourceObject.put(Constants.METADATA_KEY_INGESTION_DATETIME, ingestionDatetime);
+
+ String sourceUniqueKey = getSourceUniqueKey(sourceObject);
+
+ // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key
+ if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) {
+
+ // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments)
+ if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){
+
+ int currentSegmentCount = Integer.parseInt(index) + 1;
+ int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount");
+ if(currentSegmentCount > storedSegmentCount) {
+
+ sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject);
+ }
+
+ } else {
+
+ sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject);
+ }
+ }
+ currentPageEmbeddingId = match.embeddingId();
+ }
+
+ if(previousPageEmbeddingId.compareTo(currentPageEmbeddingId) == 0) {
+ break;
+ } else {
+ previousPageEmbeddingId = currentPageEmbeddingId;
+ }
+
+ } while(embeddingMatches.size() == queryParams.embeddingPageSize());
+
+ jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values()));
+ jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size());
+ return jsonObject;
+ }
+
+ /**
+ * Retrieves a unique key for a given source object by checking specific metadata fields.
+ *
+ * The method first attempts to retrieve a unique identifier using the source ID (if available).
+ * If the source ID is not present, it generates an alternative key by concatenating the
+ * {@code fullPath} or {@code url} (whichever is available) with the {@code ingestionDatetime}.
+ *
+ *
+ * @param sourceObject A {@code JSONObject} containing metadata fields for the source. The expected
+ * keys include {@code METADATA_KEY_SOURCE_ID}, {@code METADATA_KEY_URL},
+ * {@code METADATA_KEY_FULL_PATH}, and {@code METADATA_KEY_INGESTION_DATETIME}.
+ * @return A unique key as a {@code String}. If {@code sourceId} is present, it is returned directly.
+ * Otherwise, the alternative key, based on available fields, is generated and returned.
+ * Returns an empty string if all fields are missing or empty.
+ */
+ protected String getSourceUniqueKey(JSONObject sourceObject) {
+
+ String sourceId = sourceObject.has(Constants.METADATA_KEY_SOURCE_ID) ? sourceObject.getString(Constants.METADATA_KEY_SOURCE_ID) : "";
+
+ String url = sourceObject.has(Constants.METADATA_KEY_URL) ? sourceObject.getString(Constants.METADATA_KEY_URL) : "";
+ String fullPath = sourceObject.has(Constants.METADATA_KEY_FULL_PATH) ? sourceObject.getString(Constants.METADATA_KEY_FULL_PATH) : "";
+ String ingestionDatetime = sourceObject.has(Constants.METADATA_KEY_INGESTION_DATETIME) ? sourceObject.getString(Constants.METADATA_KEY_INGESTION_DATETIME) : "";
+
+ String alternativeKey =
+ ((fullPath != null && !fullPath.isEmpty()) ? fullPath :
+ (url != null && !url.isEmpty()) ? url : "") +
+ ((ingestionDatetime != null && !ingestionDatetime.isEmpty()) ? ingestionDatetime : "");
+
+ return !sourceId.isEmpty() ? sourceId : alternativeKey;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String storeName;
+ private Configuration configuration;
+ private QueryParameters queryParams;
+ private EmbeddingModelNameParameters modelParams;
+ private EmbeddingModel embeddingModel;
+
+ public Builder() {
+
+ }
+
+ public Builder storeName(String storeName) {
+ this.storeName = storeName;
+ return this;
+ }
+
+ public Builder configuration(Configuration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ public Builder queryParams(QueryParameters queryParams) {
+ this.queryParams = queryParams;
+ return this;
+ }
+
+ public Builder modelParams(EmbeddingModelNameParameters modelParams) {
+ this.modelParams = modelParams;
+ return this;
+ }
+
+ public Builder embeddingModel(EmbeddingModel embeddingModel) {
+ this.embeddingModel = embeddingModel;
+ return this;
+ }
+
+ public VectorStore build() {
+
+ VectorStore embeddingStore;
+
+ switch (configuration.getVectorStore()) {
+
+ case Constants.VECTOR_STORE_MILVUS:
+
+ embeddingStore = new MilvusVectorStore(storeName, configuration, queryParams, modelParams);
+ break;
+
+ case Constants.VECTOR_STORE_AI_SEARCH:
+
+ case Constants.VECTOR_STORE_CHROMA:
+
+ case Constants.VECTOR_STORE_PINECONE:
+
+ case Constants.VECTOR_STORE_ELASTICSEARCH:
+
+ case Constants.VECTOR_STORE_PGVECTOR:
+
+ case Constants.VECTOR_STORE_WEAVIATE:
+
+ embeddingStore = new VectorStore(storeName, configuration, queryParams, modelParams);
+ break;
+
+ default:
+ //throw new IllegalOperationException("Unsupported Vector Store: " + configuration.getVectorStore());
+ embeddingStore = null;
+ }
+ return embeddingStore;
+ }
+ }
+}
diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusVectorStore.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusVectorStore.java
new file mode 100644
index 0000000..ee10aa2
--- /dev/null
+++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/helper/store/milvus/MilvusVectorStore.java
@@ -0,0 +1,142 @@
+package org.mule.extension.mulechain.vectors.internal.helper.store.milvus;
+
+import com.google.gson.JsonObject;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.QueryResults;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.dml.QueryIteratorParam;
+import io.milvus.param.dml.QueryParam;
+import io.milvus.param.R;
+import io.milvus.param.collection.LoadCollectionParam;
+import io.milvus.response.QueryResultsWrapper;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.mule.extension.mulechain.vectors.internal.config.Configuration;
+import org.mule.extension.mulechain.vectors.internal.constant.Constants;
+import org.mule.extension.mulechain.vectors.internal.helper.parameter.EmbeddingModelNameParameters;
+import org.mule.extension.mulechain.vectors.internal.helper.parameter.QueryParameters;
+import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore;
+import org.mule.extension.mulechain.vectors.internal.util.JsonUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mule.extension.mulechain.vectors.internal.util.JsonUtils.readConfigFile;
+
+public class MilvusVectorStore extends VectorStore {
+
+ private String uri;
+
+ public MilvusVectorStore(String storeName, Configuration configuration, QueryParameters queryParams, EmbeddingModelNameParameters modelParams) {
+
+ super(storeName, configuration, queryParams, modelParams);
+
+ JSONObject config = readConfigFile(configuration.getConfigFilePath());
+ JSONObject vectorStoreConfig = config.getJSONObject(Constants.VECTOR_STORE_MILVUS);
+ this.uri = vectorStoreConfig.getString("MILVUS_URL");
+ }
+
+ public JSONObject listSources() {
+
+ HashMap sourcesJSONObjectHashMap = new HashMap();
+
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("storeName", storeName);
+ JSONArray sources = new JSONArray();
+
+ // Specify the host and port for the Milvus server
+ ConnectParam connectParam = ConnectParam.newBuilder()
+ .withUri(this.uri)
+ .build();
+
+ MilvusServiceClient client = new MilvusServiceClient(connectParam);
+
+ try {
+
+ boolean hasMore = true;
+
+ // Build the query with iterator
+ QueryIteratorParam iteratorParam = QueryIteratorParam.newBuilder()
+ .withCollectionName(storeName)
+ .withBatchSize((long)queryParams.embeddingPageSize())
+ .withOutFields(Arrays.asList("metadata"))
+ .build();
+
+ R queryIteratorRes = client.queryIterator(iteratorParam);
+
+ if (queryIteratorRes.getStatus() != R.Status.Success.getCode()) {
+ System.err.println(queryIteratorRes.getMessage());
+ }
+
+ QueryIterator queryIterator = queryIteratorRes.getData();
+ List results = new ArrayList<>();
+
+ while (hasMore) {
+
+ List batchResults = queryIterator.next();
+
+ if (batchResults.isEmpty()) {
+
+ queryIterator.close();
+ hasMore = false;
+ } else {
+
+ LOGGER.debug("Number of segments in current batch: " + batchResults.size());
+ for (QueryResultsWrapper.RowRecord rowRecord : batchResults) {
+
+ JsonObject gsonObject = (JsonObject)rowRecord.getFieldValues().get("metadata");
+ LOGGER.debug(gsonObject.toString());
+ JSONObject metadata = new JSONObject(gsonObject.toString());
+
+ String sourceId = metadata.has(Constants.METADATA_KEY_SOURCE_ID) ? metadata.getString(Constants.METADATA_KEY_SOURCE_ID) : null;
+ String index = metadata.has(Constants.METADATA_KEY_INDEX) ? metadata.getString(Constants.METADATA_KEY_INDEX) : null;
+ String fileName = metadata.has(Constants.METADATA_KEY_FILE_NAME) ? metadata.getString(Constants.METADATA_KEY_FILE_NAME) : null;
+ String url = metadata.has(Constants.METADATA_KEY_URL) ? metadata.getString(Constants.METADATA_KEY_URL) : null;
+ String fullPath = metadata.has(Constants.METADATA_KEY_FULL_PATH) ? metadata.getString(Constants.METADATA_KEY_FULL_PATH) : null;
+ String absoluteDirectoryPath = metadata.has(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) ? metadata.getString(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH) : null;
+ String ingestionDatetime = metadata.has(Constants.METADATA_KEY_INGESTION_DATETIME) ? metadata.getString(Constants.METADATA_KEY_INGESTION_DATETIME) : null;
+
+ JSONObject sourceObject = new JSONObject();
+ sourceObject.put("segmentCount", Integer.parseInt(index) + 1);
+ sourceObject.put(Constants.METADATA_KEY_SOURCE_ID, sourceId);
+ sourceObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath);
+ sourceObject.put(Constants.METADATA_KEY_FULL_PATH, fullPath);
+ sourceObject.put(Constants.METADATA_KEY_FILE_NAME, fileName);
+ sourceObject.put(Constants.METADATA_KEY_URL, url);
+ sourceObject.put(Constants.METADATA_KEY_INGESTION_DATETIME, ingestionDatetime);
+
+ String sourceUniqueKey = getSourceUniqueKey(sourceObject);
+
+ // Add sourceObject to sources only if it has at least one key-value pair and it's possible to generate a key
+ if (!sourceObject.isEmpty() && sourceUniqueKey != null && !sourceUniqueKey.isEmpty()) {
+ // Overwrite sourceObject if current one has a greater index (greatest index represents the number of segments)
+ if(sourcesJSONObjectHashMap.containsKey(sourceUniqueKey)){
+ // Get current index
+ int currentSegmentCount = Integer.parseInt(index) + 1;
+ // Get previously stored index
+ int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(sourceUniqueKey).get("segmentCount");
+ // Check if object need to be updated
+ if(currentSegmentCount > storedSegmentCount) {
+ sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject);
+ }
+ } else {
+ sourcesJSONObjectHashMap.put(sourceUniqueKey, sourceObject);
+ }
+ }
+
+ }
+ }
+ }
+ } finally {
+ client.close();
+ }
+
+ jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values()));
+ jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size());
+
+ return jsonObject;
+ }
+}
diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java
index afa4417..02610ea 100644
--- a/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java
+++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/operation/EmbeddingOperations.java
@@ -1,12 +1,10 @@
package org.mule.extension.mulechain.vectors.internal.operation;
-import static dev.langchain4j.store.embedding.filter.MetadataFilterBuilder.metadataKey;
import static org.apache.commons.io.IOUtils.toInputStream;
import static org.mule.extension.mulechain.vectors.internal.util.JsonUtils.readConfigFile;
import static org.mule.runtime.extension.api.annotation.param.MediaType.APPLICATION_JSON;
import java.io.InputStream;
-import java.util.HashMap;
import java.util.List;
import java.nio.charset.StandardCharsets;
@@ -21,7 +19,7 @@
import dev.langchain4j.store.embedding.filter.Filter;
import org.json.JSONArray;
import org.json.JSONObject;
-import org.mule.extension.mulechain.vectors.internal.util.JsonUtils;
+import org.mule.extension.mulechain.vectors.internal.helper.store.VectorStore;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.param.*;
@@ -303,10 +301,6 @@ public InputStream queryByFilterFromEmbedding(String storeName, String question,
jsonObject.put("question", question);
JSONArray sources = new JSONArray();
- String absoluteDirectoryPath;
- String fileName;
- String url;
- String ingestionDateTime;
JSONObject contentObject;
String fullPath;
@@ -361,117 +355,17 @@ public InputStream listSourcesFromStore(String storeName,
EmbeddingOperationValidator.validateOperationType(
Constants.EMBEDDING_OPERATION_TYPE_FILTER_BY_METADATA,configuration.getVectorStore());
+ EmbeddingOperationValidator.validateOperationType(
+ Constants.EMBEDDING_OPERATION_TYPE_QUERY_ALL,configuration.getVectorStore());
- EmbeddingModel embeddingModel = EmbeddingModelFactory.createModel(configuration, modelParams);
- EmbeddingStore store = EmbeddingStoreFactory.createStore(configuration, storeName, embeddingModel.dimension());
-
- // Create a general query vector (e.g., zero vector). Zero vector is often used when you need to retrieve all
- // embeddings without any specific bias.
- float[] queryVector = new float[embeddingModel.dimension()];
- for (int i = 0; i < embeddingModel.dimension(); i++) {
- queryVector[i]=0.0f; // Zero vector
- }
-
- Embedding queryEmbedding = new Embedding(queryVector);
-
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("storeName", storeName);
- JSONArray sources = new JSONArray();
-
- List> embeddingMatches = null;
- HashMap sourcesJSONObjectHashMap = new HashMap();
- String lowerBoundaryIngestionDateTime = "0000-00-00T00:00:00.000Z";
- int lowerBoundaryIndex = -1;
-
- LOGGER.debug("Embedding page size: " + queryParams.embeddingPageSize());
- String previousPageEmbeddingId = "";
- do {
-
- LOGGER.debug("Embedding page filter: lowerBoundaryIngestionDateTime: " + lowerBoundaryIngestionDateTime + ", lowerBoundaryIndex: " + lowerBoundaryIndex);
-
- Filter condition1 = metadataKey(Constants.METADATA_KEY_INGESTION_DATETIME).isGreaterThanOrEqualTo(lowerBoundaryIngestionDateTime);
- Filter condition2 = metadataKey(Constants.METADATA_KEY_INDEX).isGreaterThan(String.valueOf(lowerBoundaryIndex)); // Index must be handled as a String
- Filter condition3 = metadataKey(Constants.METADATA_KEY_INGESTION_DATETIME).isGreaterThan(lowerBoundaryIngestionDateTime);
-
- Filter searchFilter = (condition1.and(condition2)).or(condition3);
-
- EmbeddingSearchRequest searchRequest = EmbeddingSearchRequest.builder()
- .queryEmbedding(queryEmbedding)
- .maxResults(queryParams.embeddingPageSize())
- .minScore(0.0)
- .filter(searchFilter)
- .build();
-
- EmbeddingSearchResult searchResult = store.search(searchRequest);
- embeddingMatches = searchResult.matches();
-
- String currentPageEmbeddingId = "";
- LOGGER.debug("Embedding page matches: " + embeddingMatches.size());
- for (EmbeddingMatch match : embeddingMatches) {
-
- Metadata matchMetadata = match.embedded().metadata();
- String index = matchMetadata.getString(Constants.METADATA_KEY_INDEX);
- String fileName = matchMetadata.getString(Constants.METADATA_KEY_FILE_NAME);
- String url = matchMetadata.getString(Constants.METADATA_KEY_URL);
- String fullPath = matchMetadata.getString(Constants.METADATA_KEY_FULL_PATH);
- String absoluteDirectoryPath = matchMetadata.getString(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH);
- String ingestionDatetime = matchMetadata.getString(Constants.METADATA_KEY_INGESTION_DATETIME);
-
- if(lowerBoundaryIngestionDateTime.compareTo(ingestionDatetime) < 0) {
-
- lowerBoundaryIngestionDateTime = ingestionDatetime;
- lowerBoundaryIndex = Integer.parseInt(index);
- } else if(lowerBoundaryIngestionDateTime.compareTo(ingestionDatetime) == 0) {
-
- if(Integer.parseInt(index) > lowerBoundaryIndex) {
- lowerBoundaryIndex = Integer.parseInt(index);
- }
- }
-
- JSONObject contentObject = new JSONObject();
- contentObject.put("segmentCount", Integer.parseInt(index) + 1);
- contentObject.put(Constants.METADATA_KEY_ABSOLUTE_DIRECTORY_PATH, absoluteDirectoryPath);
- contentObject.put(Constants.METADATA_KEY_FULL_PATH, fullPath);
- contentObject.put(Constants.METADATA_KEY_FILE_NAME, fileName);
- contentObject.put(Constants.METADATA_KEY_URL, url);
- contentObject.put(Constants.METADATA_KEY_INGESTION_DATETIME, ingestionDatetime);
-
- String key =
- ((fullPath != null && !fullPath.isEmpty()) ? fullPath :
- (url != null && !url.isEmpty()) ? url : "") +
- ((ingestionDatetime != null && !ingestionDatetime.isEmpty()) ? ingestionDatetime : "");
-
- // Add contentObject to sources only if it has at least one key-value pair and it's possible to generate a key
- if (!contentObject.isEmpty() && !key.isEmpty()) {
-
- // Overwrite contentObject if current one has a greater index (greatest index represents the number of segments)
- if(sourcesJSONObjectHashMap.containsKey(key)){
-
- int currentSegmentCount = Integer.parseInt(index) + 1;
- int storedSegmentCount = (int) sourcesJSONObjectHashMap.get(key).get("segmentCount");
- if(currentSegmentCount > storedSegmentCount) {
-
- sourcesJSONObjectHashMap.put(key, contentObject);
- }
-
- } else {
-
- sourcesJSONObjectHashMap.put(key, contentObject);
- }
- }
- currentPageEmbeddingId = match.embeddingId();
- }
-
- if(previousPageEmbeddingId.compareTo(currentPageEmbeddingId) == 0) {
- break;
- } else {
- previousPageEmbeddingId = currentPageEmbeddingId;
- }
-
- } while(embeddingMatches.size() == queryParams.embeddingPageSize());
+ VectorStore vectorStore = VectorStore.builder()
+ .storeName(storeName)
+ .configuration(configuration)
+ .queryParams(queryParams)
+ .modelParams(modelParams)
+ .build();
- jsonObject.put("sources", JsonUtils.jsonObjectCollectionToJsonArray(sourcesJSONObjectHashMap.values()));
- jsonObject.put("sourceCount", sourcesJSONObjectHashMap.size());
+ JSONObject jsonObject = vectorStore.listSources();
return toInputStream(jsonObject.toString(), StandardCharsets.UTF_8);
}
diff --git a/src/main/java/org/mule/extension/mulechain/vectors/internal/util/DocumentUtils.java b/src/main/java/org/mule/extension/mulechain/vectors/internal/util/DocumentUtils.java
index ffa64ab..3e56e12 100644
--- a/src/main/java/org/mule/extension/mulechain/vectors/internal/util/DocumentUtils.java
+++ b/src/main/java/org/mule/extension/mulechain/vectors/internal/util/DocumentUtils.java
@@ -23,6 +23,7 @@ public class DocumentUtils {
*/
public static void addMetadataToDocument(Document document, String fileType, String fileName, String filePath) {
+ document.metadata().add(Constants.METADATA_KEY_SOURCE_ID, dev.langchain4j.internal.Utils.randomUUID());
document.metadata().add(Constants.METADATA_KEY_FILE_TYPE, fileType);
document.metadata().add(Constants.METADATA_KEY_FILE_NAME, fileName);
document.metadata().add(Constants.METADATA_KEY_FULL_PATH, filePath);