Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Streaming updates do not work in spite of stream_update set to True #1286

Closed
1 task done
sidoncloud opened this issue Oct 18, 2024 · 5 comments · Fixed by #1367
Closed
1 task done

[Bug]: Streaming updates do not work in spite of stream_update set to True #1286

sidoncloud opened this issue Oct 18, 2024 · 5 comments · Fixed by #1367
Assignees

Comments

@sidoncloud
Copy link

File Name

https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/multimodal_rag_langchain.ipynb

What happened?

Error :FailedPrecondition: 400 StreamUpdate is not enabled on this Index.

Code block : retriever_multi_vector_img.vectorstore.add_documents(summary_docs)

Relevant log output

Upserting datapoints MatchingEngineIndex index: projects/project_id/locations/us-central1/indexes/index_number

---------------------------------------------------------------------------
_InactiveRpcError                         Traceback (most recent call last)
File /opt/conda/lib/python3.10/site-packages/google/api_core/grpc_helpers.py:76, in _wrap_unary_errors.<locals>.error_remapped_callable(*args, **kwargs)
     75 try:
---> 76     return callable_(*args, **kwargs)
     77 except grpc.RpcError as exc:

File /opt/conda/lib/python3.10/site-packages/grpc/_channel.py:1181, in _UnaryUnaryMultiCallable.__call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
   1175 (
   1176     state,
   1177     call,
   1178 ) = self._blocking(
   1179     request, timeout, metadata, credentials, wait_for_ready, compression
   1180 )
-> 1181 return _end_unary_response_blocking(state, call, False, None)

File /opt/conda/lib/python3.10/site-packages/grpc/_channel.py:1006, in _end_unary_response_blocking(state, call, with_call, deadline)
   1005 else:
-> 1006     raise _InactiveRpcError(state)

_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.FAILED_PRECONDITION
	details = "StreamUpdate is not enabled on this Index."
	debug_error_string = "UNKNOWN:Error received from peer ipv4:74.125.201.95:443 {created_time:"2024-10-18T07:10:45.543826075+00:00", grpc_status:9, grpc_message:"StreamUpdate is not enabled on this Index."}"
>

The above exception was the direct cause of the following exception:

FailedPrecondition                        Traceback (most recent call last)
Cell In[55], line 1
----> 1 retriever_multi_vector_img.vectorstore.add_documents(summary_docs)

File /opt/conda/lib/python3.10/site-packages/langchain_core/vectorstores/base.py:287, in VectorStore.add_documents(self, documents, **kwargs)
    285     texts = [doc.page_content for doc in documents]
    286     metadatas = [doc.metadata for doc in documents]
--> 287     return self.add_texts(texts, metadatas, **kwargs)
    288 msg = (
    289     f"`add_documents` and `add_texts` has not been implemented "
    290     f"for {self.__class__.__name__} "
    291 )
    292 raise NotImplementedError(msg)

File /opt/conda/lib/python3.10/site-packages/langchain_google_vertexai/vectorstores/vectorstores.py:232, in _BaseVertexAIVectorStore.add_texts(self, texts, metadatas, ids, is_complete_overwrite, **kwargs)
    228 self._document_storage.mset(list(zip(ids, documents)))
    230 embeddings = self._embeddings.embed_documents(texts)
--> 232 self._searcher.add_to_index(
    233     ids, embeddings, metadatas, is_complete_overwrite, **kwargs
    234 )
    236 return ids

File /opt/conda/lib/python3.10/site-packages/langchain_google_vertexai/vectorstores/_searcher.py:128, in VectorSearchSearcher.add_to_index(self, ids, embeddings, metadatas, is_complete_overwrite, **kwargs)
    125 data_points = to_data_points(ids, embeddings, metadatas)
    127 if self._stream_update:
--> 128     stream_update_index(index=self._index, data_points=data_points)
    129 else:
    130     if self._staging_bucket is None:

File /opt/conda/lib/python3.10/site-packages/langchain_google_vertexai/vectorstores/_utils.py:22, in stream_update_index(index, data_points)
     13 def stream_update_index(
     14     index: MatchingEngineIndex, data_points: List["meidx_types.IndexDataPoint"]
     15 ) -> None:
     16     """Updates an index using stream updating.
     17 
     18     Args:
     19         index: Vector search index.
     20         data_points: List of IndexDataPoint.
     21     """
---> 22     index.upsert_datapoints(data_points)

File ~/.local/lib/python3.10/site-packages/google/cloud/aiplatform/matching_engine/matching_engine_index.py:761, in MatchingEngineIndex.upsert_datapoints(self, datapoints, update_mask)
    753 self.wait()
    755 _LOGGER.log_action_start_against_resource(
    756     "Upserting datapoints",
    757     "index",
    758     self,
    759 )
--> 761 self.api_client.upsert_datapoints(
    762     gca_index_service.UpsertDatapointsRequest(
    763         index=self.resource_name,
    764         datapoints=datapoints,
    765         update_mask=(
    766             field_mask_pb2.FieldMask(paths=update_mask) if update_mask else None
    767         ),
    768     )
    769 )
    771 _LOGGER.log_action_completed_against_resource(
    772     "index", "Upserted datapoints", self
    773 )
    775 return self

File ~/.local/lib/python3.10/site-packages/google/cloud/aiplatform_v1/services/index_service/client.py:1402, in IndexServiceClient.upsert_datapoints(self, request, retry, timeout, metadata)
   1399 self._validate_universe_domain()
   1401 # Send the request.
-> 1402 response = rpc(
   1403     request,
   1404     retry=retry,
   1405     timeout=timeout,
   1406     metadata=metadata,
   1407 )
   1409 # Done; return the response.
   1410 return response

File /opt/conda/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py:131, in _GapicCallable.__call__(self, timeout, retry, compression, *args, **kwargs)
    128 if self._compression is not None:
    129     kwargs["compression"] = compression
--> 131 return wrapped_func(*args, **kwargs)

File /opt/conda/lib/python3.10/site-packages/google/api_core/grpc_helpers.py:78, in _wrap_unary_errors.<locals>.error_remapped_callable(*args, **kwargs)
     76     return callable_(*args, **kwargs)
     77 except grpc.RpcError as exc:
---> 78     raise exceptions.from_grpc_error(exc) from exc

FailedPrecondition: 400 StreamUpdate is not enabled on this Index.

Code of Conduct

  • I agree to follow this project's Code of Conduct
@russellsimpkins
Copy link

I have the same issue. I created my index and endpoint using Google's sample code and modified the code to use "stream_update"

DIMENSIONS = 768 # Dimensions output from textembedding-gecko
DEPLOYED_INDEX_ID = "mm_rag_langchain_index_endpoint1"
index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
display_name="mm_rag_langchain_index1",
dimensions=DIMENSIONS,
approximate_neighbors_count=150,
leaf_node_embedding_count=500,
leaf_nodes_to_search_percent=7,
index_update_method="stream_update",
description="Multimodal RAG LangChain Index",
)
index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
display_name=DEPLOYED_INDEX_ID,
description="Multimodal RAG LangChain Index Endpoint",
public_endpoint_enabled=True,
)
index_endpoint = index_endpoint.deploy_index(
index=index, deployed_index_id="mm_rag_langchain_deployed_index1"
)

My update logic is from that sample code as well.

@russellsimpkins
Copy link

I found a fix for my issue. STREAM_UPDATE is case sensitive. Creating the index this way worked to allow streaming updates

index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name="mm_rag_langchain_index_stream",
    dimensions=DIMENSIONS,
    approximate_neighbors_count=150,
    leaf_node_embedding_count=500,
    leaf_nodes_to_search_percent=7,
    index_update_method="STREAM_UPDATE",
    description="Multimodal RAG LangChain Index",
  )

This doc probably needs an update - https://cloud.google.com/vertex-ai/docs/vector-search/create-manage-index#create_index-python_vertex_ai_sdk

# Create Index
index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=display_name,
    description="Matching Engine Index",
    dimensions=100,
    approximate_neighbors_count=150,
    leaf_node_embedding_count=500,
    leaf_nodes_to_search_percent=7,
    **index_update_method="batch_update",  # Options: stream_update, batch_update**
    distance_measure_type=aiplatform.matching_engine.matching_engine_index_config.DistanceMeasureType.DOT_PRODUCT_DISTANCE,
)

Those should be in upper case index_update_method="BATCH_UPDATE", # Options: STREAM_UPDATE, BATCH_UPDATE

@holtskinner
Copy link
Collaborator

holtskinner commented Oct 31, 2024

Thanks for the feedback, not sure how it got mixed up in this sample.

https://github.com/googleapis/python-aiplatform/blob/HEAD/samples/model-builder/vector_search/vector_search_create_index_sample.py

Created PR googleapis/python-aiplatform#4605 to fix the sample

@russellsimpkins
Copy link

russellsimpkins commented Oct 31, 2024 via email

@holtskinner
Copy link
Collaborator

I think it's a quick fix to update the sample and the docs to make sure folks know to use STREAM_UPDATE vs stream_update.

Docs have been updated. https://cloud.google.com/vertex-ai/docs/vector-search/create-manage-index#create_index-python_vertex_ai_sdk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants