Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Milvus error handling #292

Merged
merged 13 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
## 0.3.12-dev3
## 0.3.12-dev4

### Enhancements

* **Migrate Vectara Destination Connector to v2**
* **Improved Milvus error handling**

## 0.3.12-dev2

Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,6 @@ omit=[
"unstructured_ingest/ingest_backoff/*",
"unstructured_ingest/enhanced_dataclass/*"
]

[tool.pytest.ini_options]
asyncio_default_fixture_loop_scope="function"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without it was getting

.venv/lib/python3.10/site-packages/pytest_asyncio/plugin.py:208: PytestDeprecationWarning: The configuration option "asyncio_default_fixture_loop_scope" is unset.
The event loop scope for asynchronous fixtures will default to the fixture caching scope. Future versions of pytest-asyncio will default the loop scope for asynchronous fixtures to function scope. Set the default fixture loop scope explicitly in order to avoid unexpected behavior in the future. Valid fixture loop scopes are: "function", "class", "module", "package", "session"

13 changes: 13 additions & 0 deletions test/integration/connectors/test_milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ def test_precheck_fails_on_nonexistent_collection(collection: str):
uploader.precheck()


@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG)
def test_precheck_fails_on_nonexisting_db(collection: str):
uploader = MilvusUploader(
connection_config=MilvusConnectionConfig(uri=DB_URI),
upload_config=MilvusUploaderConfig(db_name="nonexisting_db", collection_name=collection),
)
with pytest.raises(
DestinationConnectionError,
match="database not found",
):
uploader.precheck()


@pytest.mark.parametrize("upload_file_str", ["upload_file_ndjson", "upload_file"])
def test_milvus_stager(
request: TopRequest,
Expand Down
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.12-dev3" # pragma: no cover
__version__ = "0.3.12-dev4" # pragma: no cover
1 change: 0 additions & 1 deletion unstructured_ingest/v2/processes/connectors/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ def precheck(self) -> None:

@DestinationConnectionError.wrap
def upsert_batch(self, collection, batch):

try:
# Chroma wants lists even if there is only one element
# Upserting to prevent duplicates
Expand Down
21 changes: 15 additions & 6 deletions unstructured_ingest/v2/processes/connectors/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,18 @@ class MilvusUploader(Uploader):

@DestinationConnectionError.wrap
def precheck(self):
with self.get_client() as client:
if not client.has_collection(self.upload_config.collection_name):
raise DestinationConnectionError(
f"Collection '{self.upload_config.collection_name}' does not exist"
)
from pymilvus import MilvusException

try:
with self.get_client() as client:
if not client.has_collection(self.upload_config.collection_name):
raise DestinationConnectionError(
f"Collection '{self.upload_config.collection_name}' does not exist"
)
except MilvusException as milvus_exception:
raise DestinationConnectionError(
f"failed to precheck Milvus: {str(milvus_exception.message)}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to use str(milvus_exception.message), otherwise just for str(milvus_exception) was getting

Failed to redact: 2024-12-05 17:16:18,380 MainProcess ERROR Step precheck failure: MilvusUploader: [DestinationConnectionError] failed to precheck milvus: <MilvusException: (code=<bound method _InactiveRpcError.code of <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Token not found or unauthorized: The provided token does not exist in our records, or you do not have the necessary permissions to use this token. Please ensure you have the correct token."
debug_error_string = "UNKNOWN:Error received from peer {created_time:"2024-12-05T17:16:18.378661811+01:00", grpc_status:2, grpc_message:"Token not found or unauthorized: The provided token does not exist in our records, or you do not have the necessary permissions to use this token. Please ensure you have the correct token."}"

) from milvus_exception

@contextmanager
def get_client(self) -> Generator["MilvusClient", None, None]:
Expand Down Expand Up @@ -197,7 +204,9 @@ def insert_results(self, data: Union[dict, list[dict]]):
try:
res = client.insert(collection_name=self.upload_config.collection_name, data=data)
except MilvusException as milvus_exception:
raise WriteError("failed to upload records to milvus") from milvus_exception
raise WriteError(
f"failed to upload records to Milvus: {str(milvus_exception.message)}"
) from milvus_exception
if "err_count" in res and isinstance(res["err_count"], int) and res["err_count"] > 0:
err_count = res["err_count"]
raise WriteError(f"failed to upload {err_count} docs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ async def run_data_async(
file_data: FileData,
**kwargs: Any,
) -> None:

batches = list(batch_generator(data, batch_size=self.upload_config.batch_size))
logger.debug(
"Elements split into %i batches of size %i.",
Expand Down
Loading