From 94c12ff88b8d401e67506e97b4d242e2b0d31738 Mon Sep 17 00:00:00 2001 From: rvztz Date: Tue, 31 Oct 2023 15:29:58 -0600 Subject: [PATCH 01/26] initial commit for weaviate destination connector --- Makefile | 4 + examples/ingest/weaviate/ingest.sh | 20 +++++ requirements/ingest/weaviate.in | 3 + requirements/ingest/weaviate.txt | 49 +++++++++++ .../create-weaviate-instance.sh | 13 +++ .../weaviate-test-helpers/create_schema.py | 18 +++++ .../weaviate-test-helpers/docker-compose.yml | 21 +++++ setup.py | 1 + .../test-ingest-weaviate.sh | 50 ++++++++++++ test_unstructured_ingest/test-ingest.sh | 1 + unstructured/ingest/cli/cmds/weaviate.py | 58 +++++++++++++ unstructured/ingest/connector/weaviate.py | 81 +++++++++++++++++++ unstructured/ingest/runner/writers.py | 25 ++++++ 13 files changed, 344 insertions(+) create mode 100644 examples/ingest/weaviate/ingest.sh create mode 100644 requirements/ingest/weaviate.in create mode 100644 requirements/ingest/weaviate.txt create mode 100644 scripts/weaviate-test-helpers/create-weaviate-instance.sh create mode 100644 scripts/weaviate-test-helpers/create_schema.py create mode 100644 scripts/weaviate-test-helpers/docker-compose.yml create mode 100755 test_unstructured_ingest/test-ingest-weaviate.sh create mode 100644 unstructured/ingest/cli/cmds/weaviate.py create mode 100644 unstructured/ingest/connector/weaviate.py diff --git a/Makefile b/Makefile index edf6763a0f..119e514359 100644 --- a/Makefile +++ b/Makefile @@ -191,6 +191,10 @@ install-ingest-airtable: install-ingest-sharepoint: python3 -m pip install -r requirements/ingest-sharepoint.txt +.PHONY: install-ingest-weaviate +install-ingest-weaviate: + python3 -m pip install -r requirements/ingest/ingest-weaviate.txt + .PHONY: install-ingest-local install-ingest-local: echo "no unique dependencies for local connector" diff --git a/examples/ingest/weaviate/ingest.sh b/examples/ingest/weaviate/ingest.sh new file mode 100644 index 0000000000..ca42287cf0 --- /dev/null +++ b/examples/ingest/weaviate/ingest.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# Uploads the structured output of the files within the given S3 path. + +# Structured outputs are stored in azure-ingest-output/ + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd "$SCRIPT_DIR"/../../.. || exit 1 + +PYTHONPATH=. ./unstructured/ingest/main.py \ + s3 \ + --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ + --anonymous \ + --output-dir s3-small-batch-output-to-weaviate \ + --num-processes 2 \ + --verbose \ + --strategy fast \ + weaviate \ + --host-url http://localhost:8080 \ + --class-name pdf_elements \ diff --git a/requirements/ingest/weaviate.in b/requirements/ingest/weaviate.in new file mode 100644 index 0000000000..5336c66d9d --- /dev/null +++ b/requirements/ingest/weaviate.in @@ -0,0 +1,3 @@ +-c ../constraints.in +-c ../base.txt +weaviate-client \ No newline at end of file diff --git a/requirements/ingest/weaviate.txt b/requirements/ingest/weaviate.txt new file mode 100644 index 0000000000..68733b72e7 --- /dev/null +++ b/requirements/ingest/weaviate.txt @@ -0,0 +1,49 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# pip-compile --constraint=requirements/constraints.in requirements/ingest/weaviate.in +# +authlib==1.2.1 + # via weaviate-client +certifi==2023.7.22 + # via + # -c requirements/constraints.in + # -c requirements/ingest/../base.txt + # -c requirements/ingest/../constraints.in + # requests +cffi==1.16.0 + # via cryptography +charset-normalizer==3.3.1 + # via + # -c requirements/ingest/../base.txt + # requests +cryptography==41.0.5 + # via authlib +idna==3.4 + # via + # -c requirements/ingest/../base.txt + # requests +pycparser==2.21 + # via cffi +requests==2.31.0 + # via + # -c requirements/ingest/../base.txt + # weaviate-client +tqdm==4.66.1 + # via + # -c requirements/ingest/../base.txt + # weaviate-client +urllib3==1.26.18 + # via + # -c requirements/constraints.in + # -c requirements/ingest/../base.txt + # -c requirements/ingest/../constraints.in + # requests +validators==0.21.0 + # via weaviate-client +weaviate-client==3.23.2 + # via + # -c requirements/constraints.in + # -c requirements/ingest/../constraints.in + # -r requirements/ingest/weaviate.in diff --git a/scripts/weaviate-test-helpers/create-weaviate-instance.sh b/scripts/weaviate-test-helpers/create-weaviate-instance.sh new file mode 100644 index 0000000000..46ea25283a --- /dev/null +++ b/scripts/weaviate-test-helpers/create-weaviate-instance.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -e + +SCRIPT_DIR=$(dirname "$(realpath "$0")") + +# Create the Weaviate instance +docker compose version +docker compose -f "$SCRIPT_DIR"/docker-compose.yaml up --wait +docker compose -f "$SCRIPT_DIR"/docker-compose.yaml ps + +echo "Instance is live." +python "$SCRIPT_DIR"/create_schema.py diff --git a/scripts/weaviate-test-helpers/create_schema.py b/scripts/weaviate-test-helpers/create_schema.py new file mode 100644 index 0000000000..5757eaf970 --- /dev/null +++ b/scripts/weaviate-test-helpers/create_schema.py @@ -0,0 +1,18 @@ +import os + +import weaviate + +weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") +class_name = os.getenv("WEAVIATE_CLASS_NAME", "pdf_elements") + +class_schema = { + "class": class_name, + "vectorizer": "none", +} + +client = weaviate.Client( + url=weaviate_host_url, +) +if client.schema.exists(class_name): + client.schema.delete_class(class_name) +client.schema.create_class(class_schema) diff --git a/scripts/weaviate-test-helpers/docker-compose.yml b/scripts/weaviate-test-helpers/docker-compose.yml new file mode 100644 index 0000000000..042cc4c55e --- /dev/null +++ b/scripts/weaviate-test-helpers/docker-compose.yml @@ -0,0 +1,21 @@ +version: '3.4' +services: + weaviate: + command: + - --host + - 0.0.0.0 + - --port + - '8080' + - --scheme + - http + image: semitechnologies/weaviate:1.21.8 + ports: + - 8080:8080 + restart: on-failure:0 + environment: + QUERY_DEFAULTS_LIMIT: 25 + AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' + PERSISTENCE_DATA_PATH: '/var/lib/weaviate' + DEFAULT_VECTORIZER_MODULE: 'none' + ENABLE_MODULES: '' + CLUSTER_HOSTNAME: 'node1' diff --git a/setup.py b/setup.py index e3b0e341d3..fd2b6171c3 100644 --- a/setup.py +++ b/setup.py @@ -153,6 +153,7 @@ def load_requirements(file_list: Optional[Union[str, List[str]]] = None) -> List "delta-table": load_requirements("requirements/ingest/delta-table.in"), "salesforce": load_requirements("requirements/ingest/salesforce.in"), "jira": load_requirements("requirements/ingest/jira.in"), + "weaviate": load_requirements("requirements/ingest/weaviate.in"), # Legacy extra requirements "huggingface": load_requirements("requirements/huggingface.in"), "local-inference": all_doc_reqs, diff --git a/test_unstructured_ingest/test-ingest-weaviate.sh b/test_unstructured_ingest/test-ingest-weaviate.sh new file mode 100755 index 0000000000..f48cace922 --- /dev/null +++ b/test_unstructured_ingest/test-ingest-weaviate.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +set -e + +SCRIPT_DIR=$(dirname "$(realpath "$0")") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=s3-weaviate-dest +OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME +DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME + +# shellcheck disable=SC1091 +source "$SCRIPT_DIR"/cleanup.sh +function cleanup { + # Index cleanup + echo "Stopping Weaviate Docker container" + docker-compose -f scripts/weaviate-test-helpers/docker-compose.yaml down --remove-orphans -v + + + # Local file cleanup + cleanup_dir "$WORK_DIR" + cleanup_dir "$OUTPUT_DIR" + if [ "$CI" == "true" ]; then + cleanup_dir "$DOWNLOAD_DIR" + fi +} + +trap cleanup EXIT + +# Create weaviate instance and create `elements` class +echo "Creating weaviate instance with class `elements`" +# shellcheck source=/dev/null +scripts/elasticsearch-test-helpers/create-and-check-es.sh +wait + +PYTHONPATH=. ./unstructured/ingest/main.py \ + s3 \ + --download-dir "$DOWNLOAD_DIR" \ + --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \ + --strategy fast \ + --preserve-downloads \ + --reprocess \ + --output-dir "$OUTPUT_DIR" \ + --verbose \ + --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ + --anonymous \ + --work-dir "$WORK_DIR" \ + weaviate \ + --host-url http://localhost:8080 \ + --class-name pdf_elements \ diff --git a/test_unstructured_ingest/test-ingest.sh b/test_unstructured_ingest/test-ingest.sh index 52fbf7828c..65c08d2df8 100755 --- a/test_unstructured_ingest/test-ingest.sh +++ b/test_unstructured_ingest/test-ingest.sh @@ -43,6 +43,7 @@ all_tests=( 'test-ingest-delta-table.sh' 'test-ingest-jira.sh' 'test-ingest-sharepoint.sh' +'test-ingest-weaviate.sh' 'test-ingest-embed.sh' ) diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py new file mode 100644 index 0000000000..599cb78821 --- /dev/null +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -0,0 +1,58 @@ +import typing as t +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.interfaces import CliMixin, DelimitedString +from unstructured.ingest.interfaces import BaseConfig + +CMD_NAME = "weaviate" + + +@dataclass +class WeaviateCliWriteConfig(BaseConfig, CliMixin): + host_url: str + class_name: str + auth_keys: t.Optional[t.List[str]] = None + additional_keys: t.Optional[t.List[str]] = None + + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--host-url"], + required=True, + help="Weaviate instance url", + ), + click.Option( + ["--class-name"], + default=None, + type=int, + help="Class to ", + ), + click.Option( + ["--auth-keys"], + required=False, + type=DelimitedString(), + help="List of env variables to pull auth keys from. " + "These keys are used to authenticate the client.", + ), + click.Option( + ["--additional-keys"], + is_flag=True, + default=False, + type=DelimitedString(), + help="Additional env vars to initialize the weaviate client with.", + ), + ] + return options + + +def get_base_dest_cmd(): + from unstructured.ingest.cli.base.dest import BaseDestCmd + + cmd_cls = BaseDestCmd( + cmd_name=CMD_NAME, + cli_config=WeaviateCliWriteConfig, + ) + return cmd_cls diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py new file mode 100644 index 0000000000..7be7e1a6f9 --- /dev/null +++ b/unstructured/ingest/connector/weaviate.py @@ -0,0 +1,81 @@ +import json +import typing as t +from dataclasses import dataclass + +from unstructured.ingest.interfaces import ( + BaseConnectorConfig, + BaseDestinationConnector, + BaseIngestDoc, + WriteConfig, +) +from unstructured.ingest.logger import logger +from unstructured.utils import requires_dependencies + +BATCH_SIZE = 100 + + +@dataclass +class SimpleWeaviateConfig(BaseConnectorConfig): + host_url: str + auth_keys: t.Optional[t.List[str]] = None + additional_keys: t.Optional[t.List[str]] = None + + +@dataclass +class WeaviateWriteConfig(WriteConfig): + class_name: str + + +@dataclass +class WeaviateDestinationConnector(BaseDestinationConnector): + write_config: WeaviateWriteConfig + connector_config: SimpleWeaviateConfig + + @requires_dependencies(["weaviate"], extras="weaviate") + def initialize(self): + from weaviate import Client + + self.client: Client = Client( + url=self.connector_config.host_url, + ) + + def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None: + logger.info( + f"writing {len(json_list)} rows to destination " + f"class {self.write_config.class_name} " + f"at {self.write_config.host_url}", + ) + + with self.client.batch(batch_size=BATCH_SIZE) as b: + created = [] + for e in json_list: + created_id = b.add_data_object( + { + "type": e.get("type", ""), + "element_id": e.get("element_id", ""), + "metadata": e.get("metadata", {}), + "text": e.get("text", ""), + }, + self.write_config.class_name, + ) + created.append(created_id) + + if len(created) < len(json_list): + raise ValueError( + f"Missed {len(json_list)- len(created)} elements.", + ) + + logger.info(f"Wrote {len(created)}/{len(json_list)} elements.") + + @requires_dependencies(["deltalake"], extras="delta-table") + def write(self, docs: t.List[BaseIngestDoc]) -> None: + json_list: t.List[t.Dict[str, t.Any]] = [] + for doc in docs: + local_path = doc._output_filename + with open(local_path) as json_file: + json_content = json.load(json_file) + logger.info( + f"appending {len(json_content)} json elements from content in {local_path}", + ) + json_list.extend(json_content) + self.write_dict(json_list=json_list) diff --git a/unstructured/ingest/runner/writers.py b/unstructured/ingest/runner/writers.py index 791bbfeefc..b62c4ee3db 100644 --- a/unstructured/ingest/runner/writers.py +++ b/unstructured/ingest/runner/writers.py @@ -76,8 +76,33 @@ def delta_table_writer( ) +@requires_dependencies(["weaviate"], extras="weaviate") +def weaviate_writer( + host_url: str, + class_name: str, + auth_key: t.Optional[t.List[str]] = None, + additional_keys: t.Optional[t.List[str]] = None, + **kwargs, +): + from unstructured.ingest.connector.weaviate import ( + SimpleWeaviateConfig, + WeaviateDestinationConnector, + WeaviateWriteConfig, + ) + + return WeaviateDestinationConnector( + write_config=WeaviateWriteConfig(class_name=class_name), + connector_config=SimpleWeaviateConfig( + host_url=host_url, + auth_key=auth_key, + additional_keys=additional_keys, + ), + ) + + writer_map: t.Dict[str, t.Callable] = { "s3": s3_writer, "delta_table": delta_table_writer, "azure_cognitive_search": azure_cognitive_search_writer, + "weaviate": weaviate_writer, } From 1887ebe3b4d4b08b809b4c5231423011ab2dc039 Mon Sep 17 00:00:00 2001 From: rvztz Date: Wed, 1 Nov 2023 04:56:11 -0600 Subject: [PATCH 02/26] added test scripts --- .../create-weaviate-instance.sh | 6 +- .../weaviate-test-helpers/create_schema.py | 63 +++++++++- .../weaviate-test-helpers/docker-compose.yml | 2 +- .../test-ingest-weaviate.sh | 6 +- unstructured/ingest/cli/cmds/__init__.py | 2 + unstructured/ingest/cli/cmds/weaviate.py | 2 +- unstructured/ingest/connector/weaviate.py | 39 ++++++- unstructured/ingest/runner/writers.py | 108 ------------------ .../ingest/runner/writers/__init__.py | 2 + .../ingest/runner/writers/weaviate.py | 27 +++++ 10 files changed, 138 insertions(+), 119 deletions(-) mode change 100644 => 100755 scripts/weaviate-test-helpers/create-weaviate-instance.sh mode change 100644 => 100755 scripts/weaviate-test-helpers/create_schema.py mode change 100644 => 100755 scripts/weaviate-test-helpers/docker-compose.yml delete mode 100644 unstructured/ingest/runner/writers.py create mode 100644 unstructured/ingest/runner/writers/weaviate.py diff --git a/scripts/weaviate-test-helpers/create-weaviate-instance.sh b/scripts/weaviate-test-helpers/create-weaviate-instance.sh old mode 100644 new mode 100755 index 46ea25283a..82da1eeb18 --- a/scripts/weaviate-test-helpers/create-weaviate-instance.sh +++ b/scripts/weaviate-test-helpers/create-weaviate-instance.sh @@ -6,8 +6,8 @@ SCRIPT_DIR=$(dirname "$(realpath "$0")") # Create the Weaviate instance docker compose version -docker compose -f "$SCRIPT_DIR"/docker-compose.yaml up --wait -docker compose -f "$SCRIPT_DIR"/docker-compose.yaml ps +docker compose -f "$SCRIPT_DIR"/docker-compose.yml up --wait +docker compose -f "$SCRIPT_DIR"/docker-compose.yml ps echo "Instance is live." -python "$SCRIPT_DIR"/create_schema.py +"$SCRIPT_DIR"/create_schema.py diff --git a/scripts/weaviate-test-helpers/create_schema.py b/scripts/weaviate-test-helpers/create_schema.py old mode 100644 new mode 100755 index 5757eaf970..11873d1864 --- a/scripts/weaviate-test-helpers/create_schema.py +++ b/scripts/weaviate-test-helpers/create_schema.py @@ -1,18 +1,79 @@ +#!/usr/bin/env python3 + import os import weaviate +from unstructured.staging.weaviate import create_unstructured_weaviate_class + weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") class_name = os.getenv("WEAVIATE_CLASS_NAME", "pdf_elements") class_schema = { "class": class_name, "vectorizer": "none", + "properties": [ + {"name": "element_id", "dataType": ["text"]}, + {"name": "text", "dataType": ["text"]}, + {"name": "type", "dataType": ["text"]}, + { + "dataType": ["object"], + "name": "metadata", + "nestedProperties": [ + {"name": "category_depth", "dataType": ["int"]}, + {"name": "parent_id", "dataType": ["text"]}, + {"name": "attached_to_filename", "dataType": ["text"]}, + {"name": "filetype", "dataType": ["text"]}, + {"name": "last_modified", "dataType": ["date"]}, + {"name": "file_directory", "dataType": ["text"]}, + {"name": "filename", "dataType": ["text"]}, + { + "dataType": ["object"], + "name": "data_source", + "nestedProperties": [ + {"name": "url", "dataType": ["text"]}, + {"name": "version", "dataType": ["text"]}, + {"name": "date_created", "dataType": ["date"]}, + {"name": "date_modified", "dataType": ["date"]}, + {"name": "date_processed", "dataType": ["date"]}, + {"name": "record_locator", "dataType": ["text"]}, + ], + }, + { + "dataType": ["object"], + "name": "coordinates", + "nestedProperties": [ + {"name": "system", "dataType": ["text"]}, + {"name": "layout_width", "dataType": ["number"]}, + {"name": "layout_height", "dataType": ["number"]}, + {"name": "points", "dataType": ["text"]}, + ], + }, + {"name": "languages", "dataType": ["text[]"]}, + {"name": "page_number", "dataType": ["int"]}, + {"name": "page_name", "dataType": ["text"]}, + {"name": "url", "dataType": ["text"]}, + {"name": "link_urls", "dataType": ["text[]"]}, + {"name": "link_texts", "dataType": ["text[]"]}, + {"name": "sent_from", "dataType": ["text"]}, + {"name": "sent_to", "dataType": ["text"]}, + {"name": "subject", "dataType": ["text"]}, + {"name": "section", "dataType": ["text"]}, + {"name": "header_footer_type", "dataType": ["text"]}, + {"name": "emphasized_text_contents", "dataType": ["text[]"]}, + {"name": "emphasized_text_tags", "dataType": ["text[]"]}, + {"name": "text_as_html", "dataType": ["text"]}, + {"name": "regex_metadata", "dataType": ["text"]}, + {"name": "detection_class_prob", "dataType": ["number"]}, + ], + }, + ], } client = weaviate.Client( url=weaviate_host_url, ) +new_class = create_unstructured_weaviate_class(class_name) if client.schema.exists(class_name): client.schema.delete_class(class_name) -client.schema.create_class(class_schema) +client.schema.create_class(new_class) diff --git a/scripts/weaviate-test-helpers/docker-compose.yml b/scripts/weaviate-test-helpers/docker-compose.yml old mode 100644 new mode 100755 index 042cc4c55e..57bfafab8e --- a/scripts/weaviate-test-helpers/docker-compose.yml +++ b/scripts/weaviate-test-helpers/docker-compose.yml @@ -8,7 +8,7 @@ services: - '8080' - --scheme - http - image: semitechnologies/weaviate:1.21.8 + image: semitechnologies/weaviate:1.22.1 ports: - 8080:8080 restart: on-failure:0 diff --git a/test_unstructured_ingest/test-ingest-weaviate.sh b/test_unstructured_ingest/test-ingest-weaviate.sh index f48cace922..b74179acc0 100755 --- a/test_unstructured_ingest/test-ingest-weaviate.sh +++ b/test_unstructured_ingest/test-ingest-weaviate.sh @@ -14,7 +14,7 @@ source "$SCRIPT_DIR"/cleanup.sh function cleanup { # Index cleanup echo "Stopping Weaviate Docker container" - docker-compose -f scripts/weaviate-test-helpers/docker-compose.yaml down --remove-orphans -v + docker-compose -f scripts/weaviate-test-helpers/docker-compose.yml down --remove-orphans -v # Local file cleanup @@ -28,9 +28,9 @@ function cleanup { trap cleanup EXIT # Create weaviate instance and create `elements` class -echo "Creating weaviate instance with class `elements`" +echo "Creating weaviate instance" # shellcheck source=/dev/null -scripts/elasticsearch-test-helpers/create-and-check-es.sh +scripts/weaviate-test-helpers/create-weaviate-instance.sh wait PYTHONPATH=. ./unstructured/ingest/main.py \ diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index 7a75a5666b..9c67b7e274 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -37,6 +37,7 @@ from .salesforce import get_base_src_cmd as salesforce_base_src_cmd from .sharepoint import get_base_src_cmd as sharepoint_base_src_cmd from .slack import get_base_src_cmd as slack_base_src_cmd +from .weaviate import get_base_dest_cmd as weaviate_dest_cmd from .wikipedia import get_base_src_cmd as wikipedia_base_src_cmd if t.TYPE_CHECKING: @@ -89,6 +90,7 @@ s3_base_dest_cmd, azure_cognitive_search_base_dest_cmd, delta_table_dest_cmd, + weaviate_dest_cmd, ] # Make sure there are not overlapping names diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py index 599cb78821..2813e72091 100644 --- a/unstructured/ingest/cli/cmds/weaviate.py +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -27,7 +27,7 @@ def get_cli_options() -> t.List[click.Option]: click.Option( ["--class-name"], default=None, - type=int, + type=str, help="Class to ", ), click.Option( diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index 7be7e1a6f9..88cae78ca1 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -1,4 +1,5 @@ import json +import os import typing as t from dataclasses import dataclass @@ -20,6 +21,17 @@ class SimpleWeaviateConfig(BaseConnectorConfig): auth_keys: t.Optional[t.List[str]] = None additional_keys: t.Optional[t.List[str]] = None + def __post_init__(self): + if self.auth_keys: + self.auth_keys_dict = { + k: os.getenv(k) for k in self.auth_keys if (os.getenv(k) is not None) + } + + if self.additional_keys: + self.additional_keys_dict = { + k: os.getenv(k) for k in self.additional_keys if (os.getenv(k) is not None) + } + @dataclass class WeaviateWriteConfig(WriteConfig): @@ -39,16 +51,38 @@ def initialize(self): url=self.connector_config.host_url, ) + def conform_dict(self, element: dict) -> None: + """ + Updates the element dictionary to conform to the Weaviate schema + """ + + if ( + record_locator := element.get("metadata", {}) + .get("data_source", {}) + .get("record_locator") + ): + # Explicit casting otherwise fails schema type checking + element["metadata"]["data_source"]["record_locator"] = str(json.dumps(record_locator)) + + if ( + date_modified := element.get("metadata", {}) + .get("data_source", {}) + .get("date_modified", None) + ): + element["metadata"]["data_source"]["date_modified"] = date_modified + "Z" + def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None: logger.info( f"writing {len(json_list)} rows to destination " f"class {self.write_config.class_name} " - f"at {self.write_config.host_url}", + f"at {self.connector_config.host_url}", ) with self.client.batch(batch_size=BATCH_SIZE) as b: created = [] for e in json_list: + self.conform_dict(e) + print(e.get("metadata", {}).keys()) created_id = b.add_data_object( { "type": e.get("type", ""), @@ -57,6 +91,7 @@ def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> "text": e.get("text", ""), }, self.write_config.class_name, + vector=e.get("embeddings"), ) created.append(created_id) @@ -67,7 +102,7 @@ def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> logger.info(f"Wrote {len(created)}/{len(json_list)} elements.") - @requires_dependencies(["deltalake"], extras="delta-table") + @requires_dependencies(["weaviate"], extras="weaviate") def write(self, docs: t.List[BaseIngestDoc]) -> None: json_list: t.List[t.Dict[str, t.Any]] = [] for doc in docs: diff --git a/unstructured/ingest/runner/writers.py b/unstructured/ingest/runner/writers.py deleted file mode 100644 index b62c4ee3db..0000000000 --- a/unstructured/ingest/runner/writers.py +++ /dev/null @@ -1,108 +0,0 @@ -import typing as t -from pathlib import Path - -from unstructured.ingest.interfaces import WriteConfig -from unstructured.utils import requires_dependencies - - -@requires_dependencies(["s3fs", "fsspec"], extras="s3") -def s3_writer( - remote_url: str, - anonymous: bool, - endpoint_url: t.Optional[str] = None, - verbose: bool = False, - **kwargs, -): - from unstructured.ingest.connector.s3 import ( - S3DestinationConnector, - SimpleS3Config, - ) - - access_kwargs: t.Dict[str, t.Any] = {"anon": anonymous} - if endpoint_url: - access_kwargs["endpoint_url"] = endpoint_url - - return S3DestinationConnector( - write_config=WriteConfig(), - connector_config=SimpleS3Config( - remote_url=remote_url, - access_kwargs=access_kwargs, - ), - ) - - -@requires_dependencies(["azure"], extras="azure-cognitive-search") -def azure_cognitive_search_writer( - endpoint: str, - key: str, - index: str, - **kwargs, -): - from unstructured.ingest.connector.azure_cognitive_search import ( - AzureCognitiveSearchDestinationConnector, - AzureCognitiveSearchWriteConfig, - SimpleAzureCognitiveSearchStorageConfig, - ) - - return AzureCognitiveSearchDestinationConnector( - write_config=AzureCognitiveSearchWriteConfig( - index=index, - ), - connector_config=SimpleAzureCognitiveSearchStorageConfig( - endpoint=endpoint, - key=key, - ), - ) - - -@requires_dependencies(["deltalake"], extras="delta-table") -def delta_table_writer( - table_uri: t.Union[str, Path], - write_column: str, - mode: t.Literal["error", "append", "overwrite", "ignore"] = "error", - **kwargs, -): - from unstructured.ingest.connector.delta_table import ( - DeltaTableDestinationConnector, - DeltaTableWriteConfig, - SimpleDeltaTableConfig, - ) - - return DeltaTableDestinationConnector( - write_config=DeltaTableWriteConfig(write_column=write_column, mode=mode), - connector_config=SimpleDeltaTableConfig( - table_uri=table_uri, - ), - ) - - -@requires_dependencies(["weaviate"], extras="weaviate") -def weaviate_writer( - host_url: str, - class_name: str, - auth_key: t.Optional[t.List[str]] = None, - additional_keys: t.Optional[t.List[str]] = None, - **kwargs, -): - from unstructured.ingest.connector.weaviate import ( - SimpleWeaviateConfig, - WeaviateDestinationConnector, - WeaviateWriteConfig, - ) - - return WeaviateDestinationConnector( - write_config=WeaviateWriteConfig(class_name=class_name), - connector_config=SimpleWeaviateConfig( - host_url=host_url, - auth_key=auth_key, - additional_keys=additional_keys, - ), - ) - - -writer_map: t.Dict[str, t.Callable] = { - "s3": s3_writer, - "delta_table": delta_table_writer, - "azure_cognitive_search": azure_cognitive_search_writer, - "weaviate": weaviate_writer, -} diff --git a/unstructured/ingest/runner/writers/__init__.py b/unstructured/ingest/runner/writers/__init__.py index 701d77dbe2..5e22da9197 100644 --- a/unstructured/ingest/runner/writers/__init__.py +++ b/unstructured/ingest/runner/writers/__init__.py @@ -7,6 +7,7 @@ from .dropbox import dropbox_writer from .gcs import gcs_writer from .s3 import s3_writer +from .weaviate import weaviate_writer writer_map: t.Dict[str, t.Callable] = { "azure": azure_writer, @@ -16,6 +17,7 @@ "dropbox": dropbox_writer, "gcs": gcs_writer, "s3": s3_writer, + "weaviate": weaviate_writer, } __all__ = ["writer_map"] diff --git a/unstructured/ingest/runner/writers/weaviate.py b/unstructured/ingest/runner/writers/weaviate.py new file mode 100644 index 0000000000..654ed561a1 --- /dev/null +++ b/unstructured/ingest/runner/writers/weaviate.py @@ -0,0 +1,27 @@ +import typing as t + +from unstructured.utils import requires_dependencies + + +@requires_dependencies(["weaviate"], extras="weaviate") +def weaviate_writer( + host_url: str, + class_name: str, + auth_keys: t.Optional[t.List[str]] = None, + additional_keys: t.Optional[t.List[str]] = None, + **kwargs, +): + from unstructured.ingest.connector.weaviate import ( + SimpleWeaviateConfig, + WeaviateDestinationConnector, + WeaviateWriteConfig, + ) + + return WeaviateDestinationConnector( + write_config=WeaviateWriteConfig(class_name=class_name), + connector_config=SimpleWeaviateConfig( + host_url=host_url, + auth_keys=auth_keys, + additional_keys=additional_keys, + ), + ) From 8dc3f45ec95417cd2508b74a6ebf743113b1ea97 Mon Sep 17 00:00:00 2001 From: rvztz Date: Fri, 3 Nov 2023 04:48:42 -0600 Subject: [PATCH 03/26] Adds check to validate the number of written pdf elemeents --- .../weaviate-test-helpers/create_schema.py | 70 +-- .../weaviate-test-helpers/pdf_elements.json | 413 ++++++++++++++++++ .../test-ingest-weaviate-output.py | 19 + test_unstructured_ingest/dest/weaviate.sh | 16 +- unstructured/ingest/cli/cmds/weaviate.py | 12 +- unstructured/ingest/cli/interfaces.py | 21 +- unstructured/ingest/connector/weaviate.py | 59 ++- .../ingest/runner/writers/weaviate.py | 4 +- 8 files changed, 514 insertions(+), 100 deletions(-) create mode 100644 scripts/weaviate-test-helpers/pdf_elements.json create mode 100755 scripts/weaviate-test-helpers/test-ingest-weaviate-output.py diff --git a/scripts/weaviate-test-helpers/create_schema.py b/scripts/weaviate-test-helpers/create_schema.py index 11873d1864..cb1b2d567c 100755 --- a/scripts/weaviate-test-helpers/create_schema.py +++ b/scripts/weaviate-test-helpers/create_schema.py @@ -1,79 +1,21 @@ #!/usr/bin/env python3 +import json import os import weaviate -from unstructured.staging.weaviate import create_unstructured_weaviate_class - weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") -class_name = os.getenv("WEAVIATE_CLASS_NAME", "pdf_elements") +class_name = os.getenv("WEAVIATE_CLASS_NAME", "Pdf_elements") +new_class = None -class_schema = { - "class": class_name, - "vectorizer": "none", - "properties": [ - {"name": "element_id", "dataType": ["text"]}, - {"name": "text", "dataType": ["text"]}, - {"name": "type", "dataType": ["text"]}, - { - "dataType": ["object"], - "name": "metadata", - "nestedProperties": [ - {"name": "category_depth", "dataType": ["int"]}, - {"name": "parent_id", "dataType": ["text"]}, - {"name": "attached_to_filename", "dataType": ["text"]}, - {"name": "filetype", "dataType": ["text"]}, - {"name": "last_modified", "dataType": ["date"]}, - {"name": "file_directory", "dataType": ["text"]}, - {"name": "filename", "dataType": ["text"]}, - { - "dataType": ["object"], - "name": "data_source", - "nestedProperties": [ - {"name": "url", "dataType": ["text"]}, - {"name": "version", "dataType": ["text"]}, - {"name": "date_created", "dataType": ["date"]}, - {"name": "date_modified", "dataType": ["date"]}, - {"name": "date_processed", "dataType": ["date"]}, - {"name": "record_locator", "dataType": ["text"]}, - ], - }, - { - "dataType": ["object"], - "name": "coordinates", - "nestedProperties": [ - {"name": "system", "dataType": ["text"]}, - {"name": "layout_width", "dataType": ["number"]}, - {"name": "layout_height", "dataType": ["number"]}, - {"name": "points", "dataType": ["text"]}, - ], - }, - {"name": "languages", "dataType": ["text[]"]}, - {"name": "page_number", "dataType": ["int"]}, - {"name": "page_name", "dataType": ["text"]}, - {"name": "url", "dataType": ["text"]}, - {"name": "link_urls", "dataType": ["text[]"]}, - {"name": "link_texts", "dataType": ["text[]"]}, - {"name": "sent_from", "dataType": ["text"]}, - {"name": "sent_to", "dataType": ["text"]}, - {"name": "subject", "dataType": ["text"]}, - {"name": "section", "dataType": ["text"]}, - {"name": "header_footer_type", "dataType": ["text"]}, - {"name": "emphasized_text_contents", "dataType": ["text[]"]}, - {"name": "emphasized_text_tags", "dataType": ["text[]"]}, - {"name": "text_as_html", "dataType": ["text"]}, - {"name": "regex_metadata", "dataType": ["text"]}, - {"name": "detection_class_prob", "dataType": ["number"]}, - ], - }, - ], -} +with open("./scripts/weaviate-test-helpers/pdf_elements.json") as f: + new_class = json.load(f) client = weaviate.Client( url=weaviate_host_url, ) -new_class = create_unstructured_weaviate_class(class_name) + if client.schema.exists(class_name): client.schema.delete_class(class_name) client.schema.create_class(new_class) diff --git a/scripts/weaviate-test-helpers/pdf_elements.json b/scripts/weaviate-test-helpers/pdf_elements.json new file mode 100644 index 0000000000..93b3656481 --- /dev/null +++ b/scripts/weaviate-test-helpers/pdf_elements.json @@ -0,0 +1,413 @@ +{ + "class": "Pdf_elements", + "invertedIndexConfig": { + "bm25": { + "b": 0.75, + "k1": 1.2 + }, + "cleanupIntervalSeconds": 60, + "stopwords": { + "additions": null, + "preset": "en", + "removals": null + } + }, + "multiTenancyConfig": { + "enabled": false + }, + "properties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "element_id", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "text", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "type", + "tokenization": "word" + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "metadata", + "nestedProperties": [ + { + "dataType": [ + "int" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "category_depth" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "parent_id", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "attached_to_filename", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "filetype", + "tokenization": "word" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "last_modified" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "file_directory", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "filename", + "tokenization": "word" + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "data_source", + "nestedProperties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "url", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "version", + "tokenization": "word" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_created" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_modified" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_processed" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "record_locator", + "tokenization": "word" + } + ] + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "coordinates", + "nestedProperties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "system", + "tokenization": "word" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "layout_width" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "layout_height" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "points", + "tokenization": "word" + } + ] + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "languages", + "tokenization": "word" + }, + { + "dataType": [ + "int" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "page_number" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "page_name", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "url", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "links", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "link_urls", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "link_texts", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "sent_from", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "sent_to", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "subject", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "section", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "header_footer_type", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "emphasized_text_contents", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "emphasized_text_tags", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "text_as_html", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "regex_metadata", + "tokenization": "word" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "detection_class_prob" + } + ] + } + ], + "replicationConfig": { + "factor": 1 + }, + "shardingConfig": { + "virtualPerPhysical": 128, + "desiredCount": 1, + "actualCount": 1, + "desiredVirtualCount": 128, + "actualVirtualCount": 128, + "key": "_id", + "strategy": "hash", + "function": "murmur3" + }, + "vectorIndexConfig": { + "skip": false, + "cleanupIntervalSeconds": 300, + "maxConnections": 64, + "efConstruction": 128, + "ef": -1, + "dynamicEfMin": 100, + "dynamicEfMax": 500, + "dynamicEfFactor": 8, + "vectorCacheMaxObjects": 1000000000000, + "flatSearchCutoff": 40000, + "distance": "cosine", + "pq": { + "enabled": false, + "bitCompression": false, + "segments": 0, + "centroids": 256, + "trainingLimit": 100000, + "encoder": { + "type": "kmeans", + "distribution": "log-normal" + } + } + }, + "vectorIndexType": "hnsw", + "vectorizer": "none" +} \ No newline at end of file diff --git a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py new file mode 100755 index 0000000000..f20aa5d290 --- /dev/null +++ b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +import os + +import weaviate + +weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") +class_name = os.getenv("WEAVIATE_CLASS_NAME", "Pdf_elements") + +if __name__ == "__main__": + print(f"Checking contents of class collection " f"{class_name} at {weaviate_host_url}") + + client = weaviate.Client( + url=weaviate_host_url, + ) + + response = client.query.aggregate(class_name).with_meta_count().do() + assert response["data"]["Aggregate"][class_name][0]["meta"]["count"] == 605 + print("weaviate dest check complete") diff --git a/test_unstructured_ingest/dest/weaviate.sh b/test_unstructured_ingest/dest/weaviate.sh index b74179acc0..1de0554f43 100755 --- a/test_unstructured_ingest/dest/weaviate.sh +++ b/test_unstructured_ingest/dest/weaviate.sh @@ -2,12 +2,14 @@ set -e -SCRIPT_DIR=$(dirname "$(realpath "$0")") +DEST_PATH=$(dirname "$(realpath "$0")") +SCRIPT_DIR=$(dirname "$DEST_PATH") cd "$SCRIPT_DIR"/.. || exit 1 -OUTPUT_FOLDER_NAME=s3-weaviate-dest -OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME -WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME -DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME +OUTPUT_FOLDER_NAME=weaviate-dest +OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR} +OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME +CI=${CI:-"false"} # shellcheck disable=SC1091 source "$SCRIPT_DIR"/cleanup.sh @@ -36,7 +38,7 @@ wait PYTHONPATH=. ./unstructured/ingest/main.py \ s3 \ --download-dir "$DOWNLOAD_DIR" \ - --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \ + --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth,metadata.links \ --strategy fast \ --preserve-downloads \ --reprocess \ @@ -48,3 +50,5 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ weaviate \ --host-url http://localhost:8080 \ --class-name pdf_elements \ + +scripts/weaviate-test-helpers/test-ingest-weaviate-output.py \ No newline at end of file diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py index 2813e72091..a3e974d39b 100644 --- a/unstructured/ingest/cli/cmds/weaviate.py +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -3,7 +3,7 @@ import click -from unstructured.ingest.cli.interfaces import CliMixin, DelimitedString +from unstructured.ingest.cli.interfaces import CliMixin, Dict from unstructured.ingest.interfaces import BaseConfig CMD_NAME = "weaviate" @@ -14,7 +14,7 @@ class WeaviateCliWriteConfig(BaseConfig, CliMixin): host_url: str class_name: str auth_keys: t.Optional[t.List[str]] = None - additional_keys: t.Optional[t.List[str]] = None + additional_headers: t.Optional[t.List[str]] = None @staticmethod def get_cli_options() -> t.List[click.Option]: @@ -31,17 +31,13 @@ def get_cli_options() -> t.List[click.Option]: help="Class to ", ), click.Option( - ["--auth-keys"], - required=False, - type=DelimitedString(), - help="List of env variables to pull auth keys from. " - "These keys are used to authenticate the client.", + ["--auth-keys"], required=False, type=Dict(), help="Key,value pairs representing" ), click.Option( ["--additional-keys"], is_flag=True, default=False, - type=DelimitedString(), + type=Dict(), help="Additional env vars to initialize the weaviate client with.", ), ] diff --git a/unstructured/ingest/cli/interfaces.py b/unstructured/ingest/cli/interfaces.py index 1f61fe9a6a..b42de847ce 100644 --- a/unstructured/ingest/cli/interfaces.py +++ b/unstructured/ingest/cli/interfaces.py @@ -25,6 +25,9 @@ class Dict(click.ParamType): name = "dict" + def __init__(self, choices: t.Optional[t.List[str]] = None): + self.choices = choices if choices else [] + def convert( self, value: t.Any, @@ -32,7 +35,23 @@ def convert( ctx: t.Optional[click.Context], ) -> t.Any: try: - return json.loads(value) + d = json.loads(value) + if not self.choices: + return d + choices_str = ", ".join(map(repr, self.choices)) + for k in list(d.keys()): + if k not in self.choices: + self.fail( + ngettext( + "{value!r} is not {choice}.", + "{value!r} is not one of {choices}.", + len(self.choices), + ).format(value=k, choice=choices_str, choices=choices_str), + param, + ctx, + ) + return d + except json.JSONDecodeError: self.fail( gettext( diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index 88cae78ca1..e3acd12fff 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -1,5 +1,4 @@ import json -import os import typing as t from dataclasses import dataclass @@ -18,19 +17,8 @@ @dataclass class SimpleWeaviateConfig(BaseConnectorConfig): host_url: str - auth_keys: t.Optional[t.List[str]] = None - additional_keys: t.Optional[t.List[str]] = None - - def __post_init__(self): - if self.auth_keys: - self.auth_keys_dict = { - k: os.getenv(k) for k in self.auth_keys if (os.getenv(k) is not None) - } - - if self.additional_keys: - self.additional_keys_dict = { - k: os.getenv(k) for k in self.additional_keys if (os.getenv(k) is not None) - } + auth_keys: t.Optional[t.Dict[str, str]] = None + additional_headers: t.Optional[t.Dict[str, str]] = None @dataclass @@ -47,10 +35,45 @@ class WeaviateDestinationConnector(BaseDestinationConnector): def initialize(self): from weaviate import Client + auth = self._resolve_auth_method() + self.client: Client = Client( url=self.connector_config.host_url, + auth_client_secret=auth, + additional_headers=self.connector_config.additional_headers, ) + def _resolve_auth_method(self): + if self.connector_config.auth_keys is None: + return None + + if access_token := self.connector_config.auth_keys.get("access_token"): + from weaviate.auth import AuthBearerToken + + return AuthBearerToken( + access_token=access_token, + refresh_token=self.connector_config.auth_keys.get("refresh_token"), + ) + elif api_key := self.connector_config.auth_keys.get("api_key"): + from weaviate.auth import AuthApiKey + + return AuthApiKey(api_key=api_key) + elif client_secret := self.connector_config.auth_keys.get("client_secret"): + from weaviate.auth import AuthClientCredentials + + return AuthClientCredentials( + client_secret=client_secret, scope=self.connector_config.auth_keys.get("scope") + ) + elif (username := self.connector_config.auth_keys.get("username")) and ( + pwd := self.connector_config.auth_keys.get("password") + ): + from weaviate.auth import AuthClientPassword + + return AuthClientPassword( + username=username, password=pwd, scope=self.connector_config.auth_keys.get("scope") + ) + return None + def conform_dict(self, element: dict) -> None: """ Updates the element dictionary to conform to the Weaviate schema @@ -73,16 +96,15 @@ def conform_dict(self, element: dict) -> None: def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None: logger.info( - f"writing {len(json_list)} rows to destination " + f"writing {len(json_list)} objects to destination " f"class {self.write_config.class_name} " f"at {self.connector_config.host_url}", ) - - with self.client.batch(batch_size=BATCH_SIZE) as b: + self.client.batch.configure(batch_size=BATCH_SIZE) + with self.client.batch as b: created = [] for e in json_list: self.conform_dict(e) - print(e.get("metadata", {}).keys()) created_id = b.add_data_object( { "type": e.get("type", ""), @@ -94,7 +116,6 @@ def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> vector=e.get("embeddings"), ) created.append(created_id) - if len(created) < len(json_list): raise ValueError( f"Missed {len(json_list)- len(created)} elements.", diff --git a/unstructured/ingest/runner/writers/weaviate.py b/unstructured/ingest/runner/writers/weaviate.py index 654ed561a1..d3cb617c16 100644 --- a/unstructured/ingest/runner/writers/weaviate.py +++ b/unstructured/ingest/runner/writers/weaviate.py @@ -8,7 +8,7 @@ def weaviate_writer( host_url: str, class_name: str, auth_keys: t.Optional[t.List[str]] = None, - additional_keys: t.Optional[t.List[str]] = None, + additional_headers: t.Optional[t.List[str]] = None, **kwargs, ): from unstructured.ingest.connector.weaviate import ( @@ -22,6 +22,6 @@ def weaviate_writer( connector_config=SimpleWeaviateConfig( host_url=host_url, auth_keys=auth_keys, - additional_keys=additional_keys, + additional_headers=additional_headers, ), ) From 6d56877a81cec46b1bcc5329844b45d26601f27b Mon Sep 17 00:00:00 2001 From: rvztz Date: Fri, 3 Nov 2023 06:19:32 -0600 Subject: [PATCH 04/26] Fixes notes on cli interface. Adds batch_size as a cli command. --- examples/ingest/weaviate/ingest.sh | 1 + test_unstructured_ingest/dest/weaviate.sh | 1 - unstructured/ingest/cli/cmds/weaviate.py | 41 ++++++++++++------- unstructured/ingest/connector/weaviate.py | 15 +++---- .../ingest/runner/writers/weaviate.py | 8 ++-- 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/examples/ingest/weaviate/ingest.sh b/examples/ingest/weaviate/ingest.sh index ca42287cf0..cb8d6e0caa 100644 --- a/examples/ingest/weaviate/ingest.sh +++ b/examples/ingest/weaviate/ingest.sh @@ -18,3 +18,4 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ weaviate \ --host-url http://localhost:8080 \ --class-name pdf_elements \ + --batch-size 100 diff --git a/test_unstructured_ingest/dest/weaviate.sh b/test_unstructured_ingest/dest/weaviate.sh index 1de0554f43..27c8aad568 100755 --- a/test_unstructured_ingest/dest/weaviate.sh +++ b/test_unstructured_ingest/dest/weaviate.sh @@ -38,7 +38,6 @@ wait PYTHONPATH=. ./unstructured/ingest/main.py \ s3 \ --download-dir "$DOWNLOAD_DIR" \ - --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth,metadata.links \ --strategy fast \ --preserve-downloads \ --reprocess \ diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py index a3e974d39b..1e7e7dcaec 100644 --- a/unstructured/ingest/cli/cmds/weaviate.py +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -3,18 +3,19 @@ import click -from unstructured.ingest.cli.interfaces import CliMixin, Dict -from unstructured.ingest.interfaces import BaseConfig +from unstructured.ingest.cli.interfaces import ( + CliConfig, + Dict +) +from unstructured.ingest.connector.weaviate import SimpleWeaviateConfig CMD_NAME = "weaviate" - @dataclass -class WeaviateCliWriteConfig(BaseConfig, CliMixin): +class WeaviateCliConfig(CliConfig): host_url: str class_name: str - auth_keys: t.Optional[t.List[str]] = None - additional_headers: t.Optional[t.List[str]] = None + auth_keys: t.Optional[t.Dict[str, str]] = None @staticmethod def get_cli_options() -> t.List[click.Option]: @@ -28,18 +29,27 @@ def get_cli_options() -> t.List[click.Option]: ["--class-name"], default=None, type=str, - help="Class to ", + help="Target class collection name", ), click.Option( ["--auth-keys"], required=False, type=Dict(), help="Key,value pairs representing" - ), + ) + ] + return options + +@dataclass +class WeaviateCliWriteConfig(CliConfig): + batch_size: int + + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ click.Option( - ["--additional-keys"], - is_flag=True, - default=False, - type=Dict(), - help="Additional env vars to initialize the weaviate client with.", - ), + ["--batch-size"], + default=100, + type=int, + help="Batch insert size", + ) ] return options @@ -49,6 +59,7 @@ def get_base_dest_cmd(): cmd_cls = BaseDestCmd( cmd_name=CMD_NAME, - cli_config=WeaviateCliWriteConfig, + cli_config=WeaviateCliConfig, + additional_cli_options=[WeaviateCliWriteConfig] ) return cmd_cls diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index e3acd12fff..722fa5fac1 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -11,19 +11,15 @@ from unstructured.ingest.logger import logger from unstructured.utils import requires_dependencies -BATCH_SIZE = 100 - - @dataclass class SimpleWeaviateConfig(BaseConnectorConfig): host_url: str + class_name: str auth_keys: t.Optional[t.Dict[str, str]] = None - additional_headers: t.Optional[t.Dict[str, str]] = None - @dataclass class WeaviateWriteConfig(WriteConfig): - class_name: str + batch_size: int = 100 @dataclass @@ -40,7 +36,6 @@ def initialize(self): self.client: Client = Client( url=self.connector_config.host_url, auth_client_secret=auth, - additional_headers=self.connector_config.additional_headers, ) def _resolve_auth_method(self): @@ -97,10 +92,10 @@ def conform_dict(self, element: dict) -> None: def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None: logger.info( f"writing {len(json_list)} objects to destination " - f"class {self.write_config.class_name} " + f"class {self.connector_config.class_name} " f"at {self.connector_config.host_url}", ) - self.client.batch.configure(batch_size=BATCH_SIZE) + self.client.batch.configure(batch_size=self.write_config.batch_size) with self.client.batch as b: created = [] for e in json_list: @@ -112,7 +107,7 @@ def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> "metadata": e.get("metadata", {}), "text": e.get("text", ""), }, - self.write_config.class_name, + self.connector_config.class_name, vector=e.get("embeddings"), ) created.append(created_id) diff --git a/unstructured/ingest/runner/writers/weaviate.py b/unstructured/ingest/runner/writers/weaviate.py index d3cb617c16..ecad9edc7c 100644 --- a/unstructured/ingest/runner/writers/weaviate.py +++ b/unstructured/ingest/runner/writers/weaviate.py @@ -7,8 +7,8 @@ def weaviate_writer( host_url: str, class_name: str, + batch_size: int = 100, auth_keys: t.Optional[t.List[str]] = None, - additional_headers: t.Optional[t.List[str]] = None, **kwargs, ): from unstructured.ingest.connector.weaviate import ( @@ -18,10 +18,12 @@ def weaviate_writer( ) return WeaviateDestinationConnector( - write_config=WeaviateWriteConfig(class_name=class_name), + write_config=WeaviateWriteConfig( + batch_size=batch_size + ), connector_config=SimpleWeaviateConfig( host_url=host_url, + class_name=class_name, auth_keys=auth_keys, - additional_headers=additional_headers, ), ) From 0a29da43b9fccf930da54c905c20932228c1edbc Mon Sep 17 00:00:00 2001 From: rvztz Date: Fri, 3 Nov 2023 06:19:56 -0600 Subject: [PATCH 05/26] linting --- unstructured/ingest/cli/cmds/weaviate.py | 12 +++++------- unstructured/ingest/connector/weaviate.py | 2 ++ unstructured/ingest/runner/writers/weaviate.py | 4 +--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py index 1e7e7dcaec..323c0284d6 100644 --- a/unstructured/ingest/cli/cmds/weaviate.py +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -3,14 +3,11 @@ import click -from unstructured.ingest.cli.interfaces import ( - CliConfig, - Dict -) -from unstructured.ingest.connector.weaviate import SimpleWeaviateConfig +from unstructured.ingest.cli.interfaces import CliConfig, Dict CMD_NAME = "weaviate" + @dataclass class WeaviateCliConfig(CliConfig): host_url: str @@ -33,10 +30,11 @@ def get_cli_options() -> t.List[click.Option]: ), click.Option( ["--auth-keys"], required=False, type=Dict(), help="Key,value pairs representing" - ) + ), ] return options + @dataclass class WeaviateCliWriteConfig(CliConfig): batch_size: int @@ -60,6 +58,6 @@ def get_base_dest_cmd(): cmd_cls = BaseDestCmd( cmd_name=CMD_NAME, cli_config=WeaviateCliConfig, - additional_cli_options=[WeaviateCliWriteConfig] + additional_cli_options=[WeaviateCliWriteConfig], ) return cmd_cls diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index 722fa5fac1..d6ab2d21b1 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -11,12 +11,14 @@ from unstructured.ingest.logger import logger from unstructured.utils import requires_dependencies + @dataclass class SimpleWeaviateConfig(BaseConnectorConfig): host_url: str class_name: str auth_keys: t.Optional[t.Dict[str, str]] = None + @dataclass class WeaviateWriteConfig(WriteConfig): batch_size: int = 100 diff --git a/unstructured/ingest/runner/writers/weaviate.py b/unstructured/ingest/runner/writers/weaviate.py index ecad9edc7c..39dc242354 100644 --- a/unstructured/ingest/runner/writers/weaviate.py +++ b/unstructured/ingest/runner/writers/weaviate.py @@ -18,9 +18,7 @@ def weaviate_writer( ) return WeaviateDestinationConnector( - write_config=WeaviateWriteConfig( - batch_size=batch_size - ), + write_config=WeaviateWriteConfig(batch_size=batch_size), connector_config=SimpleWeaviateConfig( host_url=host_url, class_name=class_name, From 988790769c8eaad68cce19a2d52937c82a3af897 Mon Sep 17 00:00:00 2001 From: rvztz Date: Fri, 3 Nov 2023 06:26:09 -0600 Subject: [PATCH 06/26] Removes changes on cli interfaces Dict() type --- unstructured/ingest/cli/interfaces.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/unstructured/ingest/cli/interfaces.py b/unstructured/ingest/cli/interfaces.py index b42de847ce..a1ec39e11a 100644 --- a/unstructured/ingest/cli/interfaces.py +++ b/unstructured/ingest/cli/interfaces.py @@ -35,22 +35,7 @@ def convert( ctx: t.Optional[click.Context], ) -> t.Any: try: - d = json.loads(value) - if not self.choices: - return d - choices_str = ", ".join(map(repr, self.choices)) - for k in list(d.keys()): - if k not in self.choices: - self.fail( - ngettext( - "{value!r} is not {choice}.", - "{value!r} is not one of {choices}.", - len(self.choices), - ).format(value=k, choice=choices_str, choices=choices_str), - param, - ctx, - ) - return d + return json.loads(value) except json.JSONDecodeError: self.fail( From 008b732fe606157aff2ff841c0da12d972243197 Mon Sep 17 00:00:00 2001 From: rvztz Date: Fri, 3 Nov 2023 22:24:33 -0600 Subject: [PATCH 07/26] Adds `DestinationConnectionError` to the weaviate dest connector --- CHANGELOG.md | 3 ++- Makefile | 2 +- examples/ingest/weaviate/ingest.sh | 4 ++-- .../test-ingest-weaviate-output.py | 7 ++++++- unstructured/__version__.py | 2 +- unstructured/ingest/cli/cmds/weaviate.py | 15 ++++++++++++--- unstructured/ingest/cli/interfaces.py | 4 ---- unstructured/ingest/connector/weaviate.py | 14 ++++---------- 8 files changed, 28 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af276ffe48..77b9867822 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.10.29-dev12 +## 0.10.29-dev13 ### Enhancements @@ -11,6 +11,7 @@ ### Features * **Allow setting table crop parameter** In certain circumstances, adjusting the table crop padding may improve table. +* **Weaviate destination connector** Weaviate connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data from over 20 data sources (so far) to a Weaviate object collection. ### Fixes diff --git a/Makefile b/Makefile index 094098d929..164733c819 100644 --- a/Makefile +++ b/Makefile @@ -193,7 +193,7 @@ install-ingest-sharepoint: .PHONY: install-ingest-weaviate install-ingest-weaviate: - python3 -m pip install -r requirements/ingest/ingest-weaviate.txt + python3 -m pip install -r requirements/ingest/weaviate.txt .PHONY: install-ingest-local install-ingest-local: diff --git a/examples/ingest/weaviate/ingest.sh b/examples/ingest/weaviate/ingest.sh index cb8d6e0caa..8e43fd0461 100644 --- a/examples/ingest/weaviate/ingest.sh +++ b/examples/ingest/weaviate/ingest.sh @@ -1,8 +1,8 @@ #!/usr/bin/env bash -# Uploads the structured output of the files within the given S3 path. +# Uploads the structured output of the files within the given S3 path to a Weaviate index. -# Structured outputs are stored in azure-ingest-output/ +# Structured outputs are stored in s3-small-batch-output-to-weaviate/ SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) cd "$SCRIPT_DIR"/../../.. || exit 1 diff --git a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py index f20aa5d290..1c7d9efb62 100755 --- a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py +++ b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py @@ -6,6 +6,7 @@ weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") class_name = os.getenv("WEAVIATE_CLASS_NAME", "Pdf_elements") +N_ELEMENTS = 605 if __name__ == "__main__": print(f"Checking contents of class collection " f"{class_name} at {weaviate_host_url}") @@ -15,5 +16,9 @@ ) response = client.query.aggregate(class_name).with_meta_count().do() - assert response["data"]["Aggregate"][class_name][0]["meta"]["count"] == 605 + count = response["data"]["Aggregate"][class_name][0]["meta"]["count"] + try: + assert count == N_ELEMENTS + except AssertionError: + print(f"weaviate dest check failed: expected {N_ELEMENTS}, got {count}") print("weaviate dest check complete") diff --git a/unstructured/__version__.py b/unstructured/__version__.py index e2a80eb04b..4d8efa8de1 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.10.29-dev12" # pragma: no cover +__version__ = "0.10.29-dev13" # pragma: no cover diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py index 323c0284d6..44b646d1a1 100644 --- a/unstructured/ingest/cli/cmds/weaviate.py +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -26,10 +26,19 @@ def get_cli_options() -> t.List[click.Option]: ["--class-name"], default=None, type=str, - help="Target class collection name", + help="Name of the class to push the records into, e.g: Pdf-elements", ), click.Option( - ["--auth-keys"], required=False, type=Dict(), help="Key,value pairs representing" + ["--auth-keys"], + required=False, + type=Dict(), + help=( + "String representing a JSON-like dict with key,value containing " + "the required parameters to create an authentication object. " + "The connector resolves the type of authentication object based on the parameters. " + "See https://weaviate.io/developers/weaviate/client-libraries/python_v3#api-key-authentication " + "for more information." + ), ), ] return options @@ -46,7 +55,7 @@ def get_cli_options() -> t.List[click.Option]: ["--batch-size"], default=100, type=int, - help="Batch insert size", + help="Number of records per batch", ) ] return options diff --git a/unstructured/ingest/cli/interfaces.py b/unstructured/ingest/cli/interfaces.py index a1ec39e11a..1f61fe9a6a 100644 --- a/unstructured/ingest/cli/interfaces.py +++ b/unstructured/ingest/cli/interfaces.py @@ -25,9 +25,6 @@ class Dict(click.ParamType): name = "dict" - def __init__(self, choices: t.Optional[t.List[str]] = None): - self.choices = choices if choices else [] - def convert( self, value: t.Any, @@ -36,7 +33,6 @@ def convert( ) -> t.Any: try: return json.loads(value) - except json.JSONDecodeError: self.fail( gettext( diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index d6ab2d21b1..4a47217dc0 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -2,6 +2,7 @@ import typing as t from dataclasses import dataclass +from unstructured.ingest.error import DestinationConnectionError from unstructured.ingest.interfaces import ( BaseConnectorConfig, BaseDestinationConnector, @@ -30,15 +31,12 @@ class WeaviateDestinationConnector(BaseDestinationConnector): connector_config: SimpleWeaviateConfig @requires_dependencies(["weaviate"], extras="weaviate") + @DestinationConnectionError.wrap def initialize(self): from weaviate import Client auth = self._resolve_auth_method() - - self.client: Client = Client( - url=self.connector_config.host_url, - auth_client_secret=auth, - ) + self.client: Client = Client(url=self.connector_config.host_url, auth_client_secret=auth) def _resolve_auth_method(self): if self.connector_config.auth_keys is None: @@ -113,12 +111,8 @@ def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> vector=e.get("embeddings"), ) created.append(created_id) - if len(created) < len(json_list): - raise ValueError( - f"Missed {len(json_list)- len(created)} elements.", - ) - logger.info(f"Wrote {len(created)}/{len(json_list)} elements.") + logger.info(f"Wrote {len(created)} elements.") @requires_dependencies(["weaviate"], extras="weaviate") def write(self, docs: t.List[BaseIngestDoc]) -> None: From 8a0cc4bb4938d5c3d665509bdf8dfc4f9a9408f9 Mon Sep 17 00:00:00 2001 From: rvztz Date: Wed, 8 Nov 2023 01:44:26 -0600 Subject: [PATCH 08/26] Adds formatting logic to the `conform_dict`method on the weaviate connector. Renames test table from `Pdf_elements` to `Elements`. Adds `weaviate_elements_class.json` to docs. --- .../weaviate_elements_class.json | 16 +- examples/ingest/weaviate/ingest.sh | 2 +- .../weaviate-test-helpers/create_schema.py | 4 +- scripts/weaviate-test-helpers/elements.json | 423 ++++++++++++++++++ .../test-ingest-weaviate-output.py | 7 +- test_unstructured_ingest/dest/weaviate.sh | 2 +- unstructured/ingest/cli/cmds/weaviate.py | 5 +- unstructured/ingest/connector/weaviate.py | 55 ++- 8 files changed, 485 insertions(+), 29 deletions(-) rename scripts/weaviate-test-helpers/pdf_elements.json => docs/source/ingest/destination_connectors/weaviate_elements_class.json (96%) create mode 100644 scripts/weaviate-test-helpers/elements.json diff --git a/scripts/weaviate-test-helpers/pdf_elements.json b/docs/source/ingest/destination_connectors/weaviate_elements_class.json similarity index 96% rename from scripts/weaviate-test-helpers/pdf_elements.json rename to docs/source/ingest/destination_connectors/weaviate_elements_class.json index 93b3656481..fe778594ab 100644 --- a/scripts/weaviate-test-helpers/pdf_elements.json +++ b/docs/source/ingest/destination_connectors/weaviate_elements_class.json @@ -1,5 +1,5 @@ { - "class": "Pdf_elements", + "class": "Elements", "invertedIndexConfig": { "bm25": { "b": 0.75, @@ -170,7 +170,17 @@ "indexSearchable": true, "name": "record_locator", "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "permissions_data", + "tokenization": "word" } + ] }, { @@ -228,7 +238,7 @@ }, { "dataType": [ - "int" + "text" ], "indexFilterable": true, "indexSearchable": false, @@ -254,7 +264,7 @@ }, { "dataType": [ - "text[]" + "text" ], "indexFilterable": true, "indexSearchable": true, diff --git a/examples/ingest/weaviate/ingest.sh b/examples/ingest/weaviate/ingest.sh index 8e43fd0461..e5897c0582 100644 --- a/examples/ingest/weaviate/ingest.sh +++ b/examples/ingest/weaviate/ingest.sh @@ -17,5 +17,5 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ --strategy fast \ weaviate \ --host-url http://localhost:8080 \ - --class-name pdf_elements \ + --class-name elements \ --batch-size 100 diff --git a/scripts/weaviate-test-helpers/create_schema.py b/scripts/weaviate-test-helpers/create_schema.py index cb1b2d567c..40eb80b39d 100755 --- a/scripts/weaviate-test-helpers/create_schema.py +++ b/scripts/weaviate-test-helpers/create_schema.py @@ -6,10 +6,10 @@ import weaviate weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") -class_name = os.getenv("WEAVIATE_CLASS_NAME", "Pdf_elements") +class_name = os.getenv("WEAVIATE_CLASS_NAME", "Elements") new_class = None -with open("./scripts/weaviate-test-helpers/pdf_elements.json") as f: +with open("./scripts/weaviate-test-helpers/elements.json") as f: new_class = json.load(f) client = weaviate.Client( diff --git a/scripts/weaviate-test-helpers/elements.json b/scripts/weaviate-test-helpers/elements.json new file mode 100644 index 0000000000..fe778594ab --- /dev/null +++ b/scripts/weaviate-test-helpers/elements.json @@ -0,0 +1,423 @@ +{ + "class": "Elements", + "invertedIndexConfig": { + "bm25": { + "b": 0.75, + "k1": 1.2 + }, + "cleanupIntervalSeconds": 60, + "stopwords": { + "additions": null, + "preset": "en", + "removals": null + } + }, + "multiTenancyConfig": { + "enabled": false + }, + "properties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "element_id", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "text", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "type", + "tokenization": "word" + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "metadata", + "nestedProperties": [ + { + "dataType": [ + "int" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "category_depth" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "parent_id", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "attached_to_filename", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "filetype", + "tokenization": "word" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "last_modified" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "file_directory", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "filename", + "tokenization": "word" + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "data_source", + "nestedProperties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "url", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "version", + "tokenization": "word" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_created" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_modified" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_processed" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "record_locator", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "permissions_data", + "tokenization": "word" + } + + ] + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "coordinates", + "nestedProperties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "system", + "tokenization": "word" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "layout_width" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "layout_height" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "points", + "tokenization": "word" + } + ] + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "languages", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "page_number" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "page_name", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "url", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "links", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "link_urls", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "link_texts", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "sent_from", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "sent_to", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "subject", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "section", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "header_footer_type", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "emphasized_text_contents", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "emphasized_text_tags", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "text_as_html", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "regex_metadata", + "tokenization": "word" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "detection_class_prob" + } + ] + } + ], + "replicationConfig": { + "factor": 1 + }, + "shardingConfig": { + "virtualPerPhysical": 128, + "desiredCount": 1, + "actualCount": 1, + "desiredVirtualCount": 128, + "actualVirtualCount": 128, + "key": "_id", + "strategy": "hash", + "function": "murmur3" + }, + "vectorIndexConfig": { + "skip": false, + "cleanupIntervalSeconds": 300, + "maxConnections": 64, + "efConstruction": 128, + "ef": -1, + "dynamicEfMin": 100, + "dynamicEfMax": 500, + "dynamicEfFactor": 8, + "vectorCacheMaxObjects": 1000000000000, + "flatSearchCutoff": 40000, + "distance": "cosine", + "pq": { + "enabled": false, + "bitCompression": false, + "segments": 0, + "centroids": 256, + "trainingLimit": 100000, + "encoder": { + "type": "kmeans", + "distribution": "log-normal" + } + } + }, + "vectorIndexType": "hnsw", + "vectorizer": "none" +} \ No newline at end of file diff --git a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py index 1c7d9efb62..c64c929405 100755 --- a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py +++ b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 import os +import sys import weaviate weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") -class_name = os.getenv("WEAVIATE_CLASS_NAME", "Pdf_elements") +class_name = os.getenv("WEAVIATE_CLASS_NAME", "Elements") N_ELEMENTS = 605 if __name__ == "__main__": @@ -20,5 +21,5 @@ try: assert count == N_ELEMENTS except AssertionError: - print(f"weaviate dest check failed: expected {N_ELEMENTS}, got {count}") - print("weaviate dest check complete") + sys.exit(f"weaviate dest check failed: got {count}, expected {N_ELEMENTS}") + print("weaviate dest check success") diff --git a/test_unstructured_ingest/dest/weaviate.sh b/test_unstructured_ingest/dest/weaviate.sh index 27c8aad568..66b28bd32b 100755 --- a/test_unstructured_ingest/dest/weaviate.sh +++ b/test_unstructured_ingest/dest/weaviate.sh @@ -48,6 +48,6 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ --work-dir "$WORK_DIR" \ weaviate \ --host-url http://localhost:8080 \ - --class-name pdf_elements \ + --class-name elements \ scripts/weaviate-test-helpers/test-ingest-weaviate-output.py \ No newline at end of file diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py index 44b646d1a1..4c00d17bbb 100644 --- a/unstructured/ingest/cli/cmds/weaviate.py +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -35,8 +35,9 @@ def get_cli_options() -> t.List[click.Option]: help=( "String representing a JSON-like dict with key,value containing " "the required parameters to create an authentication object. " - "The connector resolves the type of authentication object based on the parameters. " - "See https://weaviate.io/developers/weaviate/client-libraries/python_v3#api-key-authentication " + "The connector resolves th authentication object from the parameters. " + "See https://weaviate.io/developers/weaviate/client-libraries/python_v3" + "#api-key-authentication " "for more information." ), ), diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index 4a47217dc0..17638e0daa 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -69,25 +69,50 @@ def _resolve_auth_method(self): ) return None - def conform_dict(self, element: dict) -> None: + def conform_dict(self, data: dict) -> None: """ Updates the element dictionary to conform to the Weaviate schema """ - if ( - record_locator := element.get("metadata", {}) - .get("data_source", {}) - .get("record_locator") - ): + # Dict as string formatting + if record_locator := data.get("metadata", {}).get("data_source", {}).get("record_locator"): # Explicit casting otherwise fails schema type checking - element["metadata"]["data_source"]["record_locator"] = str(json.dumps(record_locator)) + data["metadata"]["data_source"]["record_locator"] = str(json.dumps(record_locator)) + + # Array of items as string formatting + if points := data.get("metadata", {}).get("coordinates", {}).get("points"): + data["metadata"]["coordinates"]["points"] = str(json.dumps(points)) + + if links := data.get("metadata", {}).get("links", {}): + data["metadata"]["links"] = str(json.dumps(links)) - if ( - date_modified := element.get("metadata", {}) - .get("data_source", {}) - .get("date_modified", None) + if permissions_data := ( + data.get("metadata", {}).get("data_source", {}).get("permissions_data") ): - element["metadata"]["data_source"]["date_modified"] = date_modified + "Z" + data["metadata"]["data_source"]["permissions_data"] = json.dumps(permissions_data) + + # Datetime formatting + if date_created := data.get("metadata", {}).get("data_source", {}).get("date_created"): + data["metadata"]["data_source"]["date_created"] = date_created + "Z" + + if date_modified := data.get("metadata", {}).get("data_source", {}).get("date_modified"): + data["metadata"]["data_source"]["date_modified"] = date_modified + "Z" + + if date_processed := data.get("metadata", {}).get("data_source", {}).get("date_processed"): + data["metadata"]["data_source"]["date_processed"] = date_processed + "Z" + + if last_modified := data.get("metadata", {}).get("last_modified", {}): + data["metadata"]["last_modified"] = last_modified + "Z" + + # String casting + if version := data.get("metadata", {}).get("data_source", {}).get("version"): + data["metadata"]["data_source"]["version"] = str(version) + + if page_number := data.get("metadata", {}).get("page_number"): + data["metadata"]["page_number"] = str(page_number) + + if regex_metadata := data.get("metadata", {}).get("regex_metadata"): + data["metadata"]["regex_metadata"] = str(json.dumps(regex_metadata)) def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None: logger.info( @@ -97,10 +122,9 @@ def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> ) self.client.batch.configure(batch_size=self.write_config.batch_size) with self.client.batch as b: - created = [] for e in json_list: self.conform_dict(e) - created_id = b.add_data_object( + b.add_data_object( { "type": e.get("type", ""), "element_id": e.get("element_id", ""), @@ -110,9 +134,6 @@ def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> self.connector_config.class_name, vector=e.get("embeddings"), ) - created.append(created_id) - - logger.info(f"Wrote {len(created)} elements.") @requires_dependencies(["weaviate"], extras="weaviate") def write(self, docs: t.List[BaseIngestDoc]) -> None: From 062d4e42e4be447b911a047365747b2c0fbc0d8e Mon Sep 17 00:00:00 2001 From: rvztz Date: Wed, 8 Nov 2023 12:59:29 -0600 Subject: [PATCH 09/26] pins weaviate python client version --- requirements/ingest/weaviate.in | 2 +- requirements/ingest/weaviate.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements/ingest/weaviate.in b/requirements/ingest/weaviate.in index 5336c66d9d..7d60b555fd 100644 --- a/requirements/ingest/weaviate.in +++ b/requirements/ingest/weaviate.in @@ -1,3 +1,3 @@ -c ../constraints.in -c ../base.txt -weaviate-client \ No newline at end of file +weaviate-client<=3.25.2 \ No newline at end of file diff --git a/requirements/ingest/weaviate.txt b/requirements/ingest/weaviate.txt index 68733b72e7..fe7c3d3cd1 100644 --- a/requirements/ingest/weaviate.txt +++ b/requirements/ingest/weaviate.txt @@ -14,7 +14,7 @@ certifi==2023.7.22 # requests cffi==1.16.0 # via cryptography -charset-normalizer==3.3.1 +charset-normalizer==3.3.2 # via # -c requirements/ingest/../base.txt # requests From 5d1a3f41bf56278b176d2c72afc72f1c2b49b2ff Mon Sep 17 00:00:00 2001 From: rvztz Date: Mon, 13 Nov 2023 04:35:39 -0600 Subject: [PATCH 10/26] version pinning on weaviate-client --- requirements/ingest/weaviate.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/ingest/weaviate.in b/requirements/ingest/weaviate.in index 7d60b555fd..1e01a68d3b 100644 --- a/requirements/ingest/weaviate.in +++ b/requirements/ingest/weaviate.in @@ -1,3 +1,3 @@ -c ../constraints.in -c ../base.txt -weaviate-client<=3.25.2 \ No newline at end of file +weaviate-client>=3.23.2 \ No newline at end of file From 68b60aa7ffa0325eb75fdf391437cdaf1ebed118 Mon Sep 17 00:00:00 2001 From: rvztz Date: Mon, 13 Nov 2023 15:51:58 -0600 Subject: [PATCH 11/26] bumps weaviate dependency version on constraints --- requirements/constraints.in | 2 +- requirements/ingest/weaviate.in | 2 +- requirements/ingest/weaviate.txt | 8 ++------ unstructured/ingest/connector/weaviate.py | 3 +++ 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/requirements/constraints.in b/requirements/constraints.in index 42d0d13960..2afffaae3b 100644 --- a/requirements/constraints.in +++ b/requirements/constraints.in @@ -35,7 +35,7 @@ pydantic<2 safetensors<=0.3.2 # use the known compatible version of weaviate and unstructured.pytesseract unstructured.pytesseract>=0.3.12 -weaviate-client==3.23.2 +weaviate-client>3.25.0 # Note(yuming) - pining to avoid conflict with paddle install matplotlib==3.7.2 # NOTE(crag) - pin to available pandas for python 3.8 (at least in CI) diff --git a/requirements/ingest/weaviate.in b/requirements/ingest/weaviate.in index 1e01a68d3b..ff297641ef 100644 --- a/requirements/ingest/weaviate.in +++ b/requirements/ingest/weaviate.in @@ -1,3 +1,3 @@ -c ../constraints.in -c ../base.txt -weaviate-client>=3.23.2 \ No newline at end of file +weaviate-client>3.25.0 \ No newline at end of file diff --git a/requirements/ingest/weaviate.txt b/requirements/ingest/weaviate.txt index fe7c3d3cd1..343ab5fe47 100644 --- a/requirements/ingest/weaviate.txt +++ b/requirements/ingest/weaviate.txt @@ -30,19 +30,15 @@ requests==2.31.0 # via # -c requirements/ingest/../base.txt # weaviate-client -tqdm==4.66.1 - # via - # -c requirements/ingest/../base.txt - # weaviate-client urllib3==1.26.18 # via # -c requirements/constraints.in # -c requirements/ingest/../base.txt # -c requirements/ingest/../constraints.in # requests -validators==0.21.0 +validators==0.22.0 # via weaviate-client -weaviate-client==3.23.2 +weaviate-client==3.25.3 # via # -c requirements/constraints.in # -c requirements/ingest/../constraints.in diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index 17638e0daa..11be24dbeb 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -38,6 +38,9 @@ def initialize(self): auth = self._resolve_auth_method() self.client: Client = Client(url=self.connector_config.host_url, auth_client_secret=auth) + def check_connection(self): + pass + def _resolve_auth_method(self): if self.connector_config.auth_keys is None: return None From 7968d3ddf72233fb5c95af1e96a6408d83180521 Mon Sep 17 00:00:00 2001 From: rvztz Date: Mon, 13 Nov 2023 21:09:51 -0600 Subject: [PATCH 12/26] Modifies `test_unstructured.staging.test_weaviate.py::test_weaviate_schema_is_valid()` to make it compatible with newer client versions --- test_unstructured/staging/test_weaviate.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test_unstructured/staging/test_weaviate.py b/test_unstructured/staging/test_weaviate.py index 3bae60625c..b13e32edb7 100644 --- a/test_unstructured/staging/test_weaviate.py +++ b/test_unstructured/staging/test_weaviate.py @@ -7,7 +7,8 @@ # NOTE(robinson) - allows tests that do not require the weaviate client to # run for the docker container with contextlib.suppress(ModuleNotFoundError): - from weaviate.schema.validate_schema import validate_schema + from weaviate import Client + from weaviate.embedded import EmbeddedOptions from unstructured.partition.json import partition_json from unstructured.staging.weaviate import ( @@ -56,4 +57,5 @@ def test_stage_for_weaviate(filename="example-docs/layout-parser-paper-fast.pdf" def test_weaviate_schema_is_valid(): unstructured_class = create_unstructured_weaviate_class() schema = {"classes": [unstructured_class]} - validate_schema(schema) + client = Client(embedded_options=EmbeddedOptions()) + client.schema.create(schema) From 63b9413b179e06e5c4b47c3b84a364e41f2e5c8a Mon Sep 17 00:00:00 2001 From: rvztz Date: Mon, 13 Nov 2023 21:10:37 -0600 Subject: [PATCH 13/26] version bump --- CHANGELOG.md | 2 +- unstructured/__version__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9847aee676..0911de49e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.10.31-dev2 +## 0.10.31-dev3 ### Enhancements * **Temporary Support for paddle language parameter** User can specify default langage code for paddle with ENV `DEFAULT_PADDLE_LANG` before we have the language mapping for paddle. diff --git a/unstructured/__version__.py b/unstructured/__version__.py index e095cd431e..12863debb9 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.10.31-dev2" # pragma: no cover +__version__ = "0.10.31-dev3" # pragma: no cover From 4d204450c8707036b1032cfd59275647f9fc0530 Mon Sep 17 00:00:00 2001 From: rvztz Date: Thu, 16 Nov 2023 06:25:27 -0600 Subject: [PATCH 14/26] removes s3 source --- .../test-ingest-weaviate-output.py | 2 +- test_unstructured_ingest/dest/weaviate.sh | 18 ++++++++---------- unstructured/ingest/connector/weaviate.py | 12 ++++++++++-- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py index c64c929405..62a926292d 100755 --- a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py +++ b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py @@ -7,7 +7,7 @@ weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") class_name = os.getenv("WEAVIATE_CLASS_NAME", "Elements") -N_ELEMENTS = 605 +N_ELEMENTS = 5 if __name__ == "__main__": print(f"Checking contents of class collection " f"{class_name} at {weaviate_host_url}") diff --git a/test_unstructured_ingest/dest/weaviate.sh b/test_unstructured_ingest/dest/weaviate.sh index 66b28bd32b..14dfd6d085 100755 --- a/test_unstructured_ingest/dest/weaviate.sh +++ b/test_unstructured_ingest/dest/weaviate.sh @@ -36,16 +36,14 @@ scripts/weaviate-test-helpers/create-weaviate-instance.sh wait PYTHONPATH=. ./unstructured/ingest/main.py \ - s3 \ - --download-dir "$DOWNLOAD_DIR" \ - --strategy fast \ - --preserve-downloads \ - --reprocess \ - --output-dir "$OUTPUT_DIR" \ - --verbose \ - --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ - --anonymous \ - --work-dir "$WORK_DIR" \ + local \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/fake-memo.pdf \ + --work-dir "$WORK_DIR" \ weaviate \ --host-url http://localhost:8080 \ --class-name elements \ diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index 11be24dbeb..2b35f0319d 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -2,7 +2,7 @@ import typing as t from dataclasses import dataclass -from unstructured.ingest.error import DestinationConnectionError +from unstructured.ingest.error import DestinationConnectionError, SourceConnectionError from unstructured.ingest.interfaces import ( BaseConnectorConfig, BaseDestinationConnector, @@ -38,8 +38,16 @@ def initialize(self): auth = self._resolve_auth_method() self.client: Client = Client(url=self.connector_config.host_url, auth_client_secret=auth) + @requires_dependencies(["weaviate"], extras="weaviate") def check_connection(self): - pass + from weaviate import Client + try: + auth = self._resolve_auth_method() + _ = Client(url=self.connector_config.host_url, auth_client_secret=auth) + except Exception as e: + logger.error(f"Failed to validate connection {e}", exc_info=True) + raise SourceConnectionError(f"failed to validate connection: {e}") + def _resolve_auth_method(self): if self.connector_config.auth_keys is None: From 9d97829e08e99453ac37c00d6778dc19cef29b18 Mon Sep 17 00:00:00 2001 From: rvztz Date: Thu, 16 Nov 2023 14:41:44 -0600 Subject: [PATCH 15/26] adds chunking and embeddings flags to local source connector on `weaviate.sh` --- test_unstructured_ingest/dest/weaviate.sh | 21 ++++++++++++++------- unstructured/ingest/connector/weaviate.py | 2 +- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/test_unstructured_ingest/dest/weaviate.sh b/test_unstructured_ingest/dest/weaviate.sh index 14dfd6d085..0af25f46b9 100755 --- a/test_unstructured_ingest/dest/weaviate.sh +++ b/test_unstructured_ingest/dest/weaviate.sh @@ -10,6 +10,7 @@ OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR} OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME CI=${CI:-"false"} +max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} # shellcheck disable=SC1091 source "$SCRIPT_DIR"/cleanup.sh @@ -37,13 +38,19 @@ wait PYTHONPATH=. ./unstructured/ingest/main.py \ local \ - --num-processes "$max_processes" \ - --output-dir "$OUTPUT_DIR" \ - --strategy fast \ - --verbose \ - --reprocess \ - --input-path example-docs/fake-memo.pdf \ - --work-dir "$WORK_DIR" \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --work-dir "$WORK_DIR" \ + --chunk-elements \ + --chunk-combine-text-under-n-chars 200\ + --chunk-new-after-n-chars 2500\ + --chunk-max-characters 38000\ + --chunk-multipage-sections \ + --embedding-provider "langchain-huggingface" \ weaviate \ --host-url http://localhost:8080 \ --class-name elements \ diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index 2b35f0319d..fc2cc5ed5b 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -41,6 +41,7 @@ def initialize(self): @requires_dependencies(["weaviate"], extras="weaviate") def check_connection(self): from weaviate import Client + try: auth = self._resolve_auth_method() _ = Client(url=self.connector_config.host_url, auth_client_secret=auth) @@ -48,7 +49,6 @@ def check_connection(self): logger.error(f"Failed to validate connection {e}", exc_info=True) raise SourceConnectionError(f"failed to validate connection: {e}") - def _resolve_auth_method(self): if self.connector_config.auth_keys is None: return None From 3039d7c6723000db5d5138d5de6d92ac789f3151 Mon Sep 17 00:00:00 2001 From: rvztz Date: Tue, 21 Nov 2023 15:56:28 -0600 Subject: [PATCH 16/26] Fixes timestamp parsing --- .../test-ingest-weaviate-output.py | 2 +- test_unstructured_ingest/dest/weaviate.sh | 2 -- unstructured/ingest/connector/weaviate.py | 19 +++++++++++++++---- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py index 62a926292d..7ab7620850 100755 --- a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py +++ b/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py @@ -7,7 +7,7 @@ weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") class_name = os.getenv("WEAVIATE_CLASS_NAME", "Elements") -N_ELEMENTS = 5 +N_ELEMENTS = 2715 if __name__ == "__main__": print(f"Checking contents of class collection " f"{class_name} at {weaviate_host_url}") diff --git a/test_unstructured_ingest/dest/weaviate.sh b/test_unstructured_ingest/dest/weaviate.sh index 0af25f46b9..6253a17de9 100755 --- a/test_unstructured_ingest/dest/weaviate.sh +++ b/test_unstructured_ingest/dest/weaviate.sh @@ -46,9 +46,7 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ --input-path example-docs/book-war-and-peace-1225p.txt \ --work-dir "$WORK_DIR" \ --chunk-elements \ - --chunk-combine-text-under-n-chars 200\ --chunk-new-after-n-chars 2500\ - --chunk-max-characters 38000\ --chunk-multipage-sections \ --embedding-provider "langchain-huggingface" \ weaviate \ diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index fc2cc5ed5b..cf7f2d2d63 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -84,6 +84,7 @@ def conform_dict(self, data: dict) -> None: """ Updates the element dictionary to conform to the Weaviate schema """ + from dateutil import parser # Dict as string formatting if record_locator := data.get("metadata", {}).get("data_source", {}).get("record_locator"): @@ -104,16 +105,26 @@ def conform_dict(self, data: dict) -> None: # Datetime formatting if date_created := data.get("metadata", {}).get("data_source", {}).get("date_created"): - data["metadata"]["data_source"]["date_created"] = date_created + "Z" + data["metadata"]["data_source"]["date_created"] = parser.parse(date_created).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) if date_modified := data.get("metadata", {}).get("data_source", {}).get("date_modified"): - data["metadata"]["data_source"]["date_modified"] = date_modified + "Z" + data["metadata"]["data_source"]["date_modified"] = parser.parse(date_modified).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) if date_processed := data.get("metadata", {}).get("data_source", {}).get("date_processed"): - data["metadata"]["data_source"]["date_processed"] = date_processed + "Z" + data["metadata"]["data_source"]["date_processed"] = parser.parse( + date_processed + ).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) if last_modified := data.get("metadata", {}).get("last_modified", {}): - data["metadata"]["last_modified"] = last_modified + "Z" + data["metadata"]["last_modified"] = parser.parse(last_modified).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) # String casting if version := data.get("metadata", {}).get("data_source", {}).get("version"): From 44b5b34fdfa84cbfa94d8d04a9a02b49c5bb93fb Mon Sep 17 00:00:00 2001 From: rvztz Date: Tue, 21 Nov 2023 17:10:48 -0600 Subject: [PATCH 17/26] moves test-ingest-weaviate-output.py --- test_unstructured_ingest/dest/weaviate.sh | 6 ++---- .../python}/test-ingest-weaviate-output.py | 2 +- unstructured/ingest/cli/cmds/weaviate.py | 9 +++------ unstructured/ingest/connector/weaviate.py | 7 +------ 4 files changed, 7 insertions(+), 17 deletions(-) rename {scripts/weaviate-test-helpers => test_unstructured_ingest/python}/test-ingest-weaviate-output.py (94%) diff --git a/test_unstructured_ingest/dest/weaviate.sh b/test_unstructured_ingest/dest/weaviate.sh index 6253a17de9..39c6a84bcc 100755 --- a/test_unstructured_ingest/dest/weaviate.sh +++ b/test_unstructured_ingest/dest/weaviate.sh @@ -23,9 +23,7 @@ function cleanup { # Local file cleanup cleanup_dir "$WORK_DIR" cleanup_dir "$OUTPUT_DIR" - if [ "$CI" == "true" ]; then - cleanup_dir "$DOWNLOAD_DIR" - fi + } trap cleanup EXIT @@ -53,4 +51,4 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ --host-url http://localhost:8080 \ --class-name elements \ -scripts/weaviate-test-helpers/test-ingest-weaviate-output.py \ No newline at end of file +"$SCRIPT_DIR"/python/test-ingest-weaviate-output.py \ No newline at end of file diff --git a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py b/test_unstructured_ingest/python/test-ingest-weaviate-output.py similarity index 94% rename from scripts/weaviate-test-helpers/test-ingest-weaviate-output.py rename to test_unstructured_ingest/python/test-ingest-weaviate-output.py index 7ab7620850..6fd1b509c6 100755 --- a/scripts/weaviate-test-helpers/test-ingest-weaviate-output.py +++ b/test_unstructured_ingest/python/test-ingest-weaviate-output.py @@ -22,4 +22,4 @@ assert count == N_ELEMENTS except AssertionError: sys.exit(f"weaviate dest check failed: got {count}, expected {N_ELEMENTS}") - print("weaviate dest check success") + print("SUCCESS: weaviate dest check") diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py index 4c00d17bbb..c15112d8c2 100644 --- a/unstructured/ingest/cli/cmds/weaviate.py +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -4,16 +4,13 @@ import click from unstructured.ingest.cli.interfaces import CliConfig, Dict +from unstructured.ingest.connector.weaviate import SimpleWeaviateConfig, WeaviateWriteConfig CMD_NAME = "weaviate" @dataclass -class WeaviateCliConfig(CliConfig): - host_url: str - class_name: str - auth_keys: t.Optional[t.Dict[str, str]] = None - +class WeaviateCliConfig(SimpleWeaviateConfig, CliConfig): @staticmethod def get_cli_options() -> t.List[click.Option]: options = [ @@ -46,7 +43,7 @@ def get_cli_options() -> t.List[click.Option]: @dataclass -class WeaviateCliWriteConfig(CliConfig): +class WeaviateCliWriteConfig(WeaviateWriteConfig, CliConfig): batch_size: int @staticmethod diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index cf7f2d2d63..000a503a0b 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -147,12 +147,7 @@ def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> for e in json_list: self.conform_dict(e) b.add_data_object( - { - "type": e.get("type", ""), - "element_id": e.get("element_id", ""), - "metadata": e.get("metadata", {}), - "text": e.get("text", ""), - }, + e, self.connector_config.class_name, vector=e.get("embeddings"), ) From 96f10abaddbea79f1b3a4efa1dbe3de05f7299b6 Mon Sep 17 00:00:00 2001 From: rvztz Date: Tue, 21 Nov 2023 19:22:09 -0600 Subject: [PATCH 18/26] Removes duplicated changelog entry, removes weaviate-client version pin for weaviate.in requirements, sets local connector as source on examples/ingest/weaviate/ingest.sh --- CHANGELOG.md | 1 - examples/ingest/weaviate/ingest.sh | 17 +++++++++++------ requirements/ingest/weaviate.in | 2 +- requirements/ingest/weaviate.txt | 2 +- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 54a040a028..6c8d55b444 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,7 +52,6 @@ * **Add functionality to do a second OCR on cropped table images.** Changes to the values for scaling ENVs affect entire page OCR output(OCR regression) so we now do a second OCR for tables. * **Adds ability to pass timeout for a request when partitioning via a `url`.** `partition` now accepts a new optional parameter `request_timeout` which if set will prevent any `requests.get` from hanging indefinitely and instead will raise a timeout error. This is useful when partitioning a url that may be slow to respond or may not respond at all. -* **Weaviate destination connector** Weaviate connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data from over 20 data sources (so far) to a Weaviate object collection. ### Fixes diff --git a/examples/ingest/weaviate/ingest.sh b/examples/ingest/weaviate/ingest.sh index e5897c0582..4fd59eae3d 100644 --- a/examples/ingest/weaviate/ingest.sh +++ b/examples/ingest/weaviate/ingest.sh @@ -8,13 +8,18 @@ SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) cd "$SCRIPT_DIR"/../../.. || exit 1 PYTHONPATH=. ./unstructured/ingest/main.py \ - s3 \ - --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ - --anonymous \ - --output-dir s3-small-batch-output-to-weaviate \ - --num-processes 2 \ - --verbose \ + local \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --work-dir "$WORK_DIR" \ + --chunk-elements \ + --chunk-new-after-n-chars 2500\ + --chunk-multipage-sections \ + --embedding-provider "langchain-huggingface" \ weaviate \ --host-url http://localhost:8080 \ --class-name elements \ diff --git a/requirements/ingest/weaviate.in b/requirements/ingest/weaviate.in index ff297641ef..5336c66d9d 100644 --- a/requirements/ingest/weaviate.in +++ b/requirements/ingest/weaviate.in @@ -1,3 +1,3 @@ -c ../constraints.in -c ../base.txt -weaviate-client>3.25.0 \ No newline at end of file +weaviate-client \ No newline at end of file diff --git a/requirements/ingest/weaviate.txt b/requirements/ingest/weaviate.txt index 343ab5fe47..b95aba5b4f 100644 --- a/requirements/ingest/weaviate.txt +++ b/requirements/ingest/weaviate.txt @@ -6,7 +6,7 @@ # authlib==1.2.1 # via weaviate-client -certifi==2023.7.22 +certifi==2023.11.17 # via # -c requirements/constraints.in # -c requirements/ingest/../base.txt From 13477dfafcbf2759518069a1fe2206f8a58a82c7 Mon Sep 17 00:00:00 2001 From: rvztz Date: Tue, 21 Nov 2023 19:32:51 -0600 Subject: [PATCH 19/26] Adds docs for weaviate connector --- docs/source/ingest/destination_connectors.rst | 1 + .../destination_connectors/weaviate.rst | 71 +++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 docs/source/ingest/destination_connectors/weaviate.rst diff --git a/docs/source/ingest/destination_connectors.rst b/docs/source/ingest/destination_connectors.rst index d9055d2074..23d806da25 100644 --- a/docs/source/ingest/destination_connectors.rst +++ b/docs/source/ingest/destination_connectors.rst @@ -11,3 +11,4 @@ in our community `Slack. `_ destination_connectors/azure_cognitive_search destination_connectors/delta_table destination_connectors/mongodb + destination_connectors/weaviate diff --git a/docs/source/ingest/destination_connectors/weaviate.rst b/docs/source/ingest/destination_connectors/weaviate.rst new file mode 100644 index 0000000000..94df4b7498 --- /dev/null +++ b/docs/source/ingest/destination_connectors/weaviate.rst @@ -0,0 +1,71 @@ +Weaviate +=========== + +Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to a Weaviate collection. + +First you'll need to install the weaviate dependencies as shown here. + +.. code:: shell + + pip install "unstructured[weaviate]" + +Run Locally +----------- +The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the +upstream weaviate connector. This will push elements into a collection schema of your choice into a weaviate instance +running locally. + +.. tabs:: + + .. tab:: Shell + + .. code:: shell + + unstructured-ingest \ + s3 \ + --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ + --anonymous \ + --output-dir s3-small-batch-output-to-sql \ + --num-processes 2 \ + --verbose \ + --strategy fast \ + weaviate \ + --host-url http://localhost:8080 \ + --class-name elements \ + + .. tab:: Python + + .. code:: python + + import subprocess + + command = [ + "unstructured-ingest", + "s3", + "--remote-url", "s3://utic-dev-tech-fixtures/small-pdf-set/", + "--anonymous", + "--output-dir", "s3-small-batch-output-to-postgresql", + "--num-processes", "2", + "--verbose", + "--strategy", "fast", + "weaviate" + "--host-url http://localhost:808" + "--class-name elements" + ] + + # Run the command + process = subprocess.Popen(command, stdout=subprocess.PIPE) + output, error = process.communicate() + + # Print output + if process.returncode == 0: + print('Command executed successfully. Output:') + print(output.decode()) + else: + print('Command failed. Error:') + print(error.decode()) + + +For a full list of the options the CLI accepts check ``unstructured-ingest weaviate --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. From 3fb6037be0fe8c2d956f0c2f4547f48416789be1 Mon Sep 17 00:00:00 2001 From: rvztz Date: Tue, 21 Nov 2023 19:41:32 -0600 Subject: [PATCH 20/26] remmoves unused var on ingest.sh example for weaviate --- examples/ingest/weaviate/ingest.sh | 3 --- 1 file changed, 3 deletions(-) diff --git a/examples/ingest/weaviate/ingest.sh b/examples/ingest/weaviate/ingest.sh index 4fd59eae3d..de46d061e3 100644 --- a/examples/ingest/weaviate/ingest.sh +++ b/examples/ingest/weaviate/ingest.sh @@ -4,9 +4,6 @@ # Structured outputs are stored in s3-small-batch-output-to-weaviate/ -SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) -cd "$SCRIPT_DIR"/../../.. || exit 1 - PYTHONPATH=. ./unstructured/ingest/main.py \ local \ --num-processes "$max_processes" \ From 7d84b7df1d600e9f9bb11efc8cacb7ff3f182970 Mon Sep 17 00:00:00 2001 From: rvztz Date: Tue, 21 Nov 2023 19:49:18 -0600 Subject: [PATCH 21/26] removes unused vars from example script --- examples/ingest/weaviate/ingest.sh | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/ingest/weaviate/ingest.sh b/examples/ingest/weaviate/ingest.sh index de46d061e3..1e3bd85a41 100644 --- a/examples/ingest/weaviate/ingest.sh +++ b/examples/ingest/weaviate/ingest.sh @@ -4,15 +4,18 @@ # Structured outputs are stored in s3-small-batch-output-to-weaviate/ +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd "$SCRIPT_DIR"/../../.. || exit 1 + PYTHONPATH=. ./unstructured/ingest/main.py \ local \ - --num-processes "$max_processes" \ - --output-dir "$OUTPUT_DIR" \ + --num-processes 2 \ + --output-dir weaviate-output \ --strategy fast \ --verbose \ --reprocess \ --input-path example-docs/book-war-and-peace-1225p.txt \ - --work-dir "$WORK_DIR" \ + --work-dir weaviate-work-dir \ --chunk-elements \ --chunk-new-after-n-chars 2500\ --chunk-multipage-sections \ From ab9199d07e70d1e11cabc76b5df9ab3c05fbddd0 Mon Sep 17 00:00:00 2001 From: rvztz Date: Wed, 22 Nov 2023 12:31:05 -0600 Subject: [PATCH 22/26] Modifies weaviate connectotr docs. Removes re-declaration of batch_size on WeaviateCliWriteConfig --- .../destination_connectors/weaviate.rst | 56 +++++++++---------- unstructured/ingest/cli/cmds/weaviate.py | 2 - unstructured/ingest/connector/weaviate.py | 3 +- 3 files changed, 28 insertions(+), 33 deletions(-) diff --git a/docs/source/ingest/destination_connectors/weaviate.rst b/docs/source/ingest/destination_connectors/weaviate.rst index 94df4b7498..cf2bd08acb 100644 --- a/docs/source/ingest/destination_connectors/weaviate.rst +++ b/docs/source/ingest/destination_connectors/weaviate.rst @@ -22,10 +22,10 @@ running locally. .. code:: shell unstructured-ingest \ - s3 \ - --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ + local \ + --input-path example-docs/fake-memo.pdf \ --anonymous \ - --output-dir s3-small-batch-output-to-sql \ + --output-dir local-output-to-weaviate \ --num-processes 2 \ --verbose \ --strategy fast \ @@ -37,33 +37,29 @@ running locally. .. code:: python - import subprocess - - command = [ - "unstructured-ingest", - "s3", - "--remote-url", "s3://utic-dev-tech-fixtures/small-pdf-set/", - "--anonymous", - "--output-dir", "s3-small-batch-output-to-postgresql", - "--num-processes", "2", - "--verbose", - "--strategy", "fast", - "weaviate" - "--host-url http://localhost:808" - "--class-name elements" - ] - - # Run the command - process = subprocess.Popen(command, stdout=subprocess.PIPE) - output, error = process.communicate() - - # Print output - if process.returncode == 0: - print('Command executed successfully. Output:') - print(output.decode()) - else: - print('Command failed. Error:') - print(error.decode()) + import os + + from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig + from unstructured.ingest.runner import LocalRunner + + if __name__ == "__main__": + runner = LocalRunner( + processor_config=ProcessorConfig( + verbose=True, + output_dir="local-output-to-weaviate", + num_processes=2, + ), + read_config=ReadConfig(), + partition_config=PartitionConfig(), + writer_type="weaviate", + writer_kwargs={ + "host_url": os.getenv("WEAVIATE_HOST_URL"), + "class_name": os.getenv("WEAVIATE_CLASS_NAME") + } + ) + runner.run( + input_path="example-docs/fake-memo.pdf", + ) For a full list of the options the CLI accepts check ``unstructured-ingest weaviate --help``. diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py index c15112d8c2..d5e712850e 100644 --- a/unstructured/ingest/cli/cmds/weaviate.py +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -44,8 +44,6 @@ def get_cli_options() -> t.List[click.Option]: @dataclass class WeaviateCliWriteConfig(WeaviateWriteConfig, CliConfig): - batch_size: int - @staticmethod def get_cli_options() -> t.List[click.Option]: options = [ diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index 000a503a0b..dc7fc1a733 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -146,10 +146,11 @@ def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> with self.client.batch as b: for e in json_list: self.conform_dict(e) + vector = e.pop("embeddings", None) b.add_data_object( e, self.connector_config.class_name, - vector=e.get("embeddings"), + vector=vector, ) @requires_dependencies(["weaviate"], extras="weaviate") From dce9302ce558d56c0b2ba56528a314802a7aae46 Mon Sep 17 00:00:00 2001 From: rvztz Date: Wed, 22 Nov 2023 15:25:17 -0600 Subject: [PATCH 23/26] Copies local input connector parameters from mongodb.sh --- test_unstructured_ingest/dest/weaviate.sh | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test_unstructured_ingest/dest/weaviate.sh b/test_unstructured_ingest/dest/weaviate.sh index 39c6a84bcc..dbbf00bb53 100755 --- a/test_unstructured_ingest/dest/weaviate.sh +++ b/test_unstructured_ingest/dest/weaviate.sh @@ -41,11 +41,8 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ --strategy fast \ --verbose \ --reprocess \ - --input-path example-docs/book-war-and-peace-1225p.txt \ + --input-path example-docs/fake-memo.pdf \ --work-dir "$WORK_DIR" \ - --chunk-elements \ - --chunk-new-after-n-chars 2500\ - --chunk-multipage-sections \ --embedding-provider "langchain-huggingface" \ weaviate \ --host-url http://localhost:8080 \ From 54cbf9dc16009c767da5182a536229d66c3464de Mon Sep 17 00:00:00 2001 From: rvztz Date: Mon, 27 Nov 2023 23:53:38 -0600 Subject: [PATCH 24/26] version bump --- CHANGELOG.md | 2 +- unstructured/__version__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58d2cdf853..c0f6efca9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.11.1-dev2 +## 0.11.1-dev3 ### Enhancements diff --git a/unstructured/__version__.py b/unstructured/__version__.py index 1e1571eb56..28ab328149 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.11.1-dev2" # pragma: no cover +__version__ = "0.11.1-dev3" # pragma: no cover From a1fe16d273c4eac154324f44fcdde5ae0162e901 Mon Sep 17 00:00:00 2001 From: rvztz Date: Tue, 28 Nov 2023 02:24:01 -0600 Subject: [PATCH 25/26] Sets correct expected number of documents created --- .../python/test-ingest-weaviate-output.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test_unstructured_ingest/python/test-ingest-weaviate-output.py b/test_unstructured_ingest/python/test-ingest-weaviate-output.py index 6fd1b509c6..846a7eb7f9 100755 --- a/test_unstructured_ingest/python/test-ingest-weaviate-output.py +++ b/test_unstructured_ingest/python/test-ingest-weaviate-output.py @@ -7,7 +7,7 @@ weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") class_name = os.getenv("WEAVIATE_CLASS_NAME", "Elements") -N_ELEMENTS = 2715 +N_ELEMENTS = 5 if __name__ == "__main__": print(f"Checking contents of class collection " f"{class_name} at {weaviate_host_url}") @@ -21,5 +21,5 @@ try: assert count == N_ELEMENTS except AssertionError: - sys.exit(f"weaviate dest check failed: got {count}, expected {N_ELEMENTS}") + sys.exit(f"FAIL: weaviate dest check failed: got {count}, expected {N_ELEMENTS}") print("SUCCESS: weaviate dest check") From 1add053d8714e06cb7d3beb08d6cd746aa2a92e1 Mon Sep 17 00:00:00 2001 From: rvztz Date: Thu, 30 Nov 2023 02:55:27 -0600 Subject: [PATCH 26/26] Sets single client instance t WeaviateDestinationConnector level --- unstructured/ingest/connector/weaviate.py | 26 +++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py index dc7fc1a733..2287dab638 100644 --- a/unstructured/ingest/connector/weaviate.py +++ b/unstructured/ingest/connector/weaviate.py @@ -1,6 +1,6 @@ import json import typing as t -from dataclasses import dataclass +from dataclasses import dataclass, field from unstructured.ingest.error import DestinationConnectionError, SourceConnectionError from unstructured.ingest.interfaces import ( @@ -12,6 +12,9 @@ from unstructured.ingest.logger import logger from unstructured.utils import requires_dependencies +if t.TYPE_CHECKING: + from weaviate import Client + @dataclass class SimpleWeaviateConfig(BaseConnectorConfig): @@ -29,22 +32,27 @@ class WeaviateWriteConfig(WriteConfig): class WeaviateDestinationConnector(BaseDestinationConnector): write_config: WeaviateWriteConfig connector_config: SimpleWeaviateConfig + _client: t.Optional["Client"] = field(init=False, default=None) + + @property + @requires_dependencies(["weaviate"], extras="weaviate") + def client(self) -> "Client": + if self._client is None: + from weaviate import Client + + auth = self._resolve_auth_method() + self._client = Client(url=self.connector_config.host_url, auth_client_secret=auth) + return self._client @requires_dependencies(["weaviate"], extras="weaviate") @DestinationConnectionError.wrap def initialize(self): - from weaviate import Client - - auth = self._resolve_auth_method() - self.client: Client = Client(url=self.connector_config.host_url, auth_client_secret=auth) + _ = self.client @requires_dependencies(["weaviate"], extras="weaviate") def check_connection(self): - from weaviate import Client - try: - auth = self._resolve_auth_method() - _ = Client(url=self.connector_config.host_url, auth_client_secret=auth) + _ = self.client except Exception as e: logger.error(f"Failed to validate connection {e}", exc_info=True) raise SourceConnectionError(f"failed to validate connection: {e}")