diff --git a/CHANGELOG.md b/CHANGELOG.md index ea2d19c906..c1fa0cdf5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.11.4-dev1 +## 0.11.4-dev2 ### Enhancements @@ -6,6 +6,9 @@ ### Features +* **Add Weaviate destination connector** Weaviate connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data from over 20 data sources (so far) to a Weaviate object collection. + + ### Fixes ## 0.11.3 diff --git a/Makefile b/Makefile index 16a07faf0a..26531282b8 100644 --- a/Makefile +++ b/Makefile @@ -191,6 +191,10 @@ install-ingest-airtable: install-ingest-sharepoint: python3 -m pip install -r requirements/ingest/sharepoint.txt +.PHONY: install-ingest-weaviate +install-ingest-weaviate: + python3 -m pip install -r requirements/ingest/weaviate.txt + .PHONY: install-ingest-local install-ingest-local: echo "no unique dependencies for local connector" diff --git a/docs/source/ingest/destination_connectors.rst b/docs/source/ingest/destination_connectors.rst index 9f9d44ede8..8fcedf5495 100644 --- a/docs/source/ingest/destination_connectors.rst +++ b/docs/source/ingest/destination_connectors.rst @@ -11,5 +11,6 @@ in our community `Slack. `_ destination_connectors/azure_cognitive_search destination_connectors/delta_table destination_connectors/mongodb + destination_connectors/weaviate destination_connectors/pinecone destination_connectors/s3 diff --git a/docs/source/ingest/destination_connectors/weaviate.rst b/docs/source/ingest/destination_connectors/weaviate.rst new file mode 100644 index 0000000000..cf2bd08acb --- /dev/null +++ b/docs/source/ingest/destination_connectors/weaviate.rst @@ -0,0 +1,67 @@ +Weaviate +=========== + +Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to a Weaviate collection. + +First you'll need to install the weaviate dependencies as shown here. + +.. code:: shell + + pip install "unstructured[weaviate]" + +Run Locally +----------- +The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the +upstream weaviate connector. This will push elements into a collection schema of your choice into a weaviate instance +running locally. + +.. tabs:: + + .. tab:: Shell + + .. code:: shell + + unstructured-ingest \ + local \ + --input-path example-docs/fake-memo.pdf \ + --anonymous \ + --output-dir local-output-to-weaviate \ + --num-processes 2 \ + --verbose \ + --strategy fast \ + weaviate \ + --host-url http://localhost:8080 \ + --class-name elements \ + + .. tab:: Python + + .. code:: python + + import os + + from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig + from unstructured.ingest.runner import LocalRunner + + if __name__ == "__main__": + runner = LocalRunner( + processor_config=ProcessorConfig( + verbose=True, + output_dir="local-output-to-weaviate", + num_processes=2, + ), + read_config=ReadConfig(), + partition_config=PartitionConfig(), + writer_type="weaviate", + writer_kwargs={ + "host_url": os.getenv("WEAVIATE_HOST_URL"), + "class_name": os.getenv("WEAVIATE_CLASS_NAME") + } + ) + runner.run( + input_path="example-docs/fake-memo.pdf", + ) + + +For a full list of the options the CLI accepts check ``unstructured-ingest weaviate --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. diff --git a/docs/source/ingest/destination_connectors/weaviate_elements_class.json b/docs/source/ingest/destination_connectors/weaviate_elements_class.json new file mode 100644 index 0000000000..fe778594ab --- /dev/null +++ b/docs/source/ingest/destination_connectors/weaviate_elements_class.json @@ -0,0 +1,423 @@ +{ + "class": "Elements", + "invertedIndexConfig": { + "bm25": { + "b": 0.75, + "k1": 1.2 + }, + "cleanupIntervalSeconds": 60, + "stopwords": { + "additions": null, + "preset": "en", + "removals": null + } + }, + "multiTenancyConfig": { + "enabled": false + }, + "properties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "element_id", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "text", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "type", + "tokenization": "word" + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "metadata", + "nestedProperties": [ + { + "dataType": [ + "int" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "category_depth" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "parent_id", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "attached_to_filename", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "filetype", + "tokenization": "word" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "last_modified" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "file_directory", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "filename", + "tokenization": "word" + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "data_source", + "nestedProperties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "url", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "version", + "tokenization": "word" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_created" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_modified" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_processed" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "record_locator", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "permissions_data", + "tokenization": "word" + } + + ] + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "coordinates", + "nestedProperties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "system", + "tokenization": "word" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "layout_width" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "layout_height" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "points", + "tokenization": "word" + } + ] + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "languages", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "page_number" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "page_name", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "url", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "links", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "link_urls", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "link_texts", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "sent_from", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "sent_to", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "subject", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "section", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "header_footer_type", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "emphasized_text_contents", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "emphasized_text_tags", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "text_as_html", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "regex_metadata", + "tokenization": "word" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "detection_class_prob" + } + ] + } + ], + "replicationConfig": { + "factor": 1 + }, + "shardingConfig": { + "virtualPerPhysical": 128, + "desiredCount": 1, + "actualCount": 1, + "desiredVirtualCount": 128, + "actualVirtualCount": 128, + "key": "_id", + "strategy": "hash", + "function": "murmur3" + }, + "vectorIndexConfig": { + "skip": false, + "cleanupIntervalSeconds": 300, + "maxConnections": 64, + "efConstruction": 128, + "ef": -1, + "dynamicEfMin": 100, + "dynamicEfMax": 500, + "dynamicEfFactor": 8, + "vectorCacheMaxObjects": 1000000000000, + "flatSearchCutoff": 40000, + "distance": "cosine", + "pq": { + "enabled": false, + "bitCompression": false, + "segments": 0, + "centroids": 256, + "trainingLimit": 100000, + "encoder": { + "type": "kmeans", + "distribution": "log-normal" + } + } + }, + "vectorIndexType": "hnsw", + "vectorizer": "none" +} \ No newline at end of file diff --git a/examples/ingest/weaviate/ingest.sh b/examples/ingest/weaviate/ingest.sh new file mode 100644 index 0000000000..1e3bd85a41 --- /dev/null +++ b/examples/ingest/weaviate/ingest.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +# Uploads the structured output of the files within the given S3 path to a Weaviate index. + +# Structured outputs are stored in s3-small-batch-output-to-weaviate/ + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd "$SCRIPT_DIR"/../../.. || exit 1 + +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --num-processes 2 \ + --output-dir weaviate-output \ + --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/book-war-and-peace-1225p.txt \ + --work-dir weaviate-work-dir \ + --chunk-elements \ + --chunk-new-after-n-chars 2500\ + --chunk-multipage-sections \ + --embedding-provider "langchain-huggingface" \ + weaviate \ + --host-url http://localhost:8080 \ + --class-name elements \ + --batch-size 100 diff --git a/requirements/constraints.in b/requirements/constraints.in index 42d0d13960..2afffaae3b 100644 --- a/requirements/constraints.in +++ b/requirements/constraints.in @@ -35,7 +35,7 @@ pydantic<2 safetensors<=0.3.2 # use the known compatible version of weaviate and unstructured.pytesseract unstructured.pytesseract>=0.3.12 -weaviate-client==3.23.2 +weaviate-client>3.25.0 # Note(yuming) - pining to avoid conflict with paddle install matplotlib==3.7.2 # NOTE(crag) - pin to available pandas for python 3.8 (at least in CI) diff --git a/requirements/ingest/weaviate.in b/requirements/ingest/weaviate.in new file mode 100644 index 0000000000..5336c66d9d --- /dev/null +++ b/requirements/ingest/weaviate.in @@ -0,0 +1,3 @@ +-c ../constraints.in +-c ../base.txt +weaviate-client \ No newline at end of file diff --git a/requirements/ingest/weaviate.txt b/requirements/ingest/weaviate.txt new file mode 100644 index 0000000000..b95aba5b4f --- /dev/null +++ b/requirements/ingest/weaviate.txt @@ -0,0 +1,45 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# pip-compile --constraint=requirements/constraints.in requirements/ingest/weaviate.in +# +authlib==1.2.1 + # via weaviate-client +certifi==2023.11.17 + # via + # -c requirements/constraints.in + # -c requirements/ingest/../base.txt + # -c requirements/ingest/../constraints.in + # requests +cffi==1.16.0 + # via cryptography +charset-normalizer==3.3.2 + # via + # -c requirements/ingest/../base.txt + # requests +cryptography==41.0.5 + # via authlib +idna==3.4 + # via + # -c requirements/ingest/../base.txt + # requests +pycparser==2.21 + # via cffi +requests==2.31.0 + # via + # -c requirements/ingest/../base.txt + # weaviate-client +urllib3==1.26.18 + # via + # -c requirements/constraints.in + # -c requirements/ingest/../base.txt + # -c requirements/ingest/../constraints.in + # requests +validators==0.22.0 + # via weaviate-client +weaviate-client==3.25.3 + # via + # -c requirements/constraints.in + # -c requirements/ingest/../constraints.in + # -r requirements/ingest/weaviate.in diff --git a/scripts/weaviate-test-helpers/create-weaviate-instance.sh b/scripts/weaviate-test-helpers/create-weaviate-instance.sh new file mode 100755 index 0000000000..82da1eeb18 --- /dev/null +++ b/scripts/weaviate-test-helpers/create-weaviate-instance.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -e + +SCRIPT_DIR=$(dirname "$(realpath "$0")") + +# Create the Weaviate instance +docker compose version +docker compose -f "$SCRIPT_DIR"/docker-compose.yml up --wait +docker compose -f "$SCRIPT_DIR"/docker-compose.yml ps + +echo "Instance is live." +"$SCRIPT_DIR"/create_schema.py diff --git a/scripts/weaviate-test-helpers/create_schema.py b/scripts/weaviate-test-helpers/create_schema.py new file mode 100755 index 0000000000..40eb80b39d --- /dev/null +++ b/scripts/weaviate-test-helpers/create_schema.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 + +import json +import os + +import weaviate + +weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") +class_name = os.getenv("WEAVIATE_CLASS_NAME", "Elements") +new_class = None + +with open("./scripts/weaviate-test-helpers/elements.json") as f: + new_class = json.load(f) + +client = weaviate.Client( + url=weaviate_host_url, +) + +if client.schema.exists(class_name): + client.schema.delete_class(class_name) +client.schema.create_class(new_class) diff --git a/scripts/weaviate-test-helpers/docker-compose.yml b/scripts/weaviate-test-helpers/docker-compose.yml new file mode 100755 index 0000000000..57bfafab8e --- /dev/null +++ b/scripts/weaviate-test-helpers/docker-compose.yml @@ -0,0 +1,21 @@ +version: '3.4' +services: + weaviate: + command: + - --host + - 0.0.0.0 + - --port + - '8080' + - --scheme + - http + image: semitechnologies/weaviate:1.22.1 + ports: + - 8080:8080 + restart: on-failure:0 + environment: + QUERY_DEFAULTS_LIMIT: 25 + AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' + PERSISTENCE_DATA_PATH: '/var/lib/weaviate' + DEFAULT_VECTORIZER_MODULE: 'none' + ENABLE_MODULES: '' + CLUSTER_HOSTNAME: 'node1' diff --git a/scripts/weaviate-test-helpers/elements.json b/scripts/weaviate-test-helpers/elements.json new file mode 100644 index 0000000000..fe778594ab --- /dev/null +++ b/scripts/weaviate-test-helpers/elements.json @@ -0,0 +1,423 @@ +{ + "class": "Elements", + "invertedIndexConfig": { + "bm25": { + "b": 0.75, + "k1": 1.2 + }, + "cleanupIntervalSeconds": 60, + "stopwords": { + "additions": null, + "preset": "en", + "removals": null + } + }, + "multiTenancyConfig": { + "enabled": false + }, + "properties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "element_id", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "text", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "type", + "tokenization": "word" + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "metadata", + "nestedProperties": [ + { + "dataType": [ + "int" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "category_depth" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "parent_id", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "attached_to_filename", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "filetype", + "tokenization": "word" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "last_modified" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "file_directory", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "filename", + "tokenization": "word" + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "data_source", + "nestedProperties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "url", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "version", + "tokenization": "word" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_created" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_modified" + }, + { + "dataType": [ + "date" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "date_processed" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "record_locator", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "permissions_data", + "tokenization": "word" + } + + ] + }, + { + "dataType": [ + "object" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "coordinates", + "nestedProperties": [ + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "system", + "tokenization": "word" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "layout_width" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "layout_height" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "points", + "tokenization": "word" + } + ] + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "languages", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "page_number" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "page_name", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "url", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "links", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "link_urls", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "link_texts", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "sent_from", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "sent_to", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "subject", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "section", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "header_footer_type", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "emphasized_text_contents", + "tokenization": "word" + }, + { + "dataType": [ + "text[]" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "emphasized_text_tags", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "text_as_html", + "tokenization": "word" + }, + { + "dataType": [ + "text" + ], + "indexFilterable": true, + "indexSearchable": true, + "name": "regex_metadata", + "tokenization": "word" + }, + { + "dataType": [ + "number" + ], + "indexFilterable": true, + "indexSearchable": false, + "name": "detection_class_prob" + } + ] + } + ], + "replicationConfig": { + "factor": 1 + }, + "shardingConfig": { + "virtualPerPhysical": 128, + "desiredCount": 1, + "actualCount": 1, + "desiredVirtualCount": 128, + "actualVirtualCount": 128, + "key": "_id", + "strategy": "hash", + "function": "murmur3" + }, + "vectorIndexConfig": { + "skip": false, + "cleanupIntervalSeconds": 300, + "maxConnections": 64, + "efConstruction": 128, + "ef": -1, + "dynamicEfMin": 100, + "dynamicEfMax": 500, + "dynamicEfFactor": 8, + "vectorCacheMaxObjects": 1000000000000, + "flatSearchCutoff": 40000, + "distance": "cosine", + "pq": { + "enabled": false, + "bitCompression": false, + "segments": 0, + "centroids": 256, + "trainingLimit": 100000, + "encoder": { + "type": "kmeans", + "distribution": "log-normal" + } + } + }, + "vectorIndexType": "hnsw", + "vectorizer": "none" +} \ No newline at end of file diff --git a/setup.py b/setup.py index ea92216dca..d7d1f7233c 100644 --- a/setup.py +++ b/setup.py @@ -155,6 +155,7 @@ def load_requirements(file_list: Optional[Union[str, List[str]]] = None) -> List "salesforce": load_requirements("requirements/ingest/salesforce.in"), "slack": load_requirements("requirements/ingest/slack.in"), "wikipedia": load_requirements("requirements/ingest/wikipedia.in"), + "weaviate": load_requirements("requirements/ingest/weaviate.in"), # Legacy extra requirements "huggingface": load_requirements("requirements/huggingface.in"), "local-inference": all_doc_reqs, diff --git a/test_unstructured/staging/test_weaviate.py b/test_unstructured/staging/test_weaviate.py index 3bae60625c..b13e32edb7 100644 --- a/test_unstructured/staging/test_weaviate.py +++ b/test_unstructured/staging/test_weaviate.py @@ -7,7 +7,8 @@ # NOTE(robinson) - allows tests that do not require the weaviate client to # run for the docker container with contextlib.suppress(ModuleNotFoundError): - from weaviate.schema.validate_schema import validate_schema + from weaviate import Client + from weaviate.embedded import EmbeddedOptions from unstructured.partition.json import partition_json from unstructured.staging.weaviate import ( @@ -56,4 +57,5 @@ def test_stage_for_weaviate(filename="example-docs/layout-parser-paper-fast.pdf" def test_weaviate_schema_is_valid(): unstructured_class = create_unstructured_weaviate_class() schema = {"classes": [unstructured_class]} - validate_schema(schema) + client = Client(embedded_options=EmbeddedOptions()) + client.schema.create(schema) diff --git a/test_unstructured_ingest/dest/weaviate.sh b/test_unstructured_ingest/dest/weaviate.sh new file mode 100755 index 0000000000..dbbf00bb53 --- /dev/null +++ b/test_unstructured_ingest/dest/weaviate.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash + +set -e + +DEST_PATH=$(dirname "$(realpath "$0")") +SCRIPT_DIR=$(dirname "$DEST_PATH") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=weaviate-dest +OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR} +OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME +WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME +CI=${CI:-"false"} +max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")} + +# shellcheck disable=SC1091 +source "$SCRIPT_DIR"/cleanup.sh +function cleanup { + # Index cleanup + echo "Stopping Weaviate Docker container" + docker-compose -f scripts/weaviate-test-helpers/docker-compose.yml down --remove-orphans -v + + + # Local file cleanup + cleanup_dir "$WORK_DIR" + cleanup_dir "$OUTPUT_DIR" + +} + +trap cleanup EXIT + +# Create weaviate instance and create `elements` class +echo "Creating weaviate instance" +# shellcheck source=/dev/null +scripts/weaviate-test-helpers/create-weaviate-instance.sh +wait + +PYTHONPATH=. ./unstructured/ingest/main.py \ + local \ + --num-processes "$max_processes" \ + --output-dir "$OUTPUT_DIR" \ + --strategy fast \ + --verbose \ + --reprocess \ + --input-path example-docs/fake-memo.pdf \ + --work-dir "$WORK_DIR" \ + --embedding-provider "langchain-huggingface" \ + weaviate \ + --host-url http://localhost:8080 \ + --class-name elements \ + +"$SCRIPT_DIR"/python/test-ingest-weaviate-output.py \ No newline at end of file diff --git a/test_unstructured_ingest/python/test-ingest-weaviate-output.py b/test_unstructured_ingest/python/test-ingest-weaviate-output.py new file mode 100755 index 0000000000..846a7eb7f9 --- /dev/null +++ b/test_unstructured_ingest/python/test-ingest-weaviate-output.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 + +import os +import sys + +import weaviate + +weaviate_host_url = os.getenv("WEAVIATE_HOST_URL", "http://localhost:8080") +class_name = os.getenv("WEAVIATE_CLASS_NAME", "Elements") +N_ELEMENTS = 5 + +if __name__ == "__main__": + print(f"Checking contents of class collection " f"{class_name} at {weaviate_host_url}") + + client = weaviate.Client( + url=weaviate_host_url, + ) + + response = client.query.aggregate(class_name).with_meta_count().do() + count = response["data"]["Aggregate"][class_name][0]["meta"]["count"] + try: + assert count == N_ELEMENTS + except AssertionError: + sys.exit(f"FAIL: weaviate dest check failed: got {count}, expected {N_ELEMENTS}") + print("SUCCESS: weaviate dest check") diff --git a/test_unstructured_ingest/test-ingest-dest.sh b/test_unstructured_ingest/test-ingest-dest.sh index 43815c6ba5..01eef7057b 100755 --- a/test_unstructured_ingest/test-ingest-dest.sh +++ b/test_unstructured_ingest/test-ingest-dest.sh @@ -24,6 +24,7 @@ all_tests=( 'mongodb.sh' 'pinecone.sh' 's3.sh' + 'weaviate.sh' 'sharepoint-embed-cog-index.sh' ) diff --git a/unstructured/__version__.py b/unstructured/__version__.py index 510b9a48ec..6feed12e46 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.11.4-dev1" # pragma: no cover +__version__ = "0.11.4-dev2" # pragma: no cover diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index eb18dbfb33..c67a3b1234 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -40,6 +40,7 @@ from .salesforce import get_base_src_cmd as salesforce_base_src_cmd from .sharepoint import get_base_src_cmd as sharepoint_base_src_cmd from .slack import get_base_src_cmd as slack_base_src_cmd +from .weaviate import get_base_dest_cmd as weaviate_dest_cmd from .wikipedia import get_base_src_cmd as wikipedia_base_src_cmd if t.TYPE_CHECKING: @@ -93,6 +94,7 @@ s3_base_dest_cmd, azure_cognitive_search_base_dest_cmd, delta_table_dest_cmd, + weaviate_dest_cmd, mongo_base_dest_cmd, pinecone_base_dest_cmd, ] diff --git a/unstructured/ingest/cli/cmds/weaviate.py b/unstructured/ingest/cli/cmds/weaviate.py new file mode 100644 index 0000000000..d5e712850e --- /dev/null +++ b/unstructured/ingest/cli/cmds/weaviate.py @@ -0,0 +1,68 @@ +import typing as t +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.interfaces import CliConfig, Dict +from unstructured.ingest.connector.weaviate import SimpleWeaviateConfig, WeaviateWriteConfig + +CMD_NAME = "weaviate" + + +@dataclass +class WeaviateCliConfig(SimpleWeaviateConfig, CliConfig): + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--host-url"], + required=True, + help="Weaviate instance url", + ), + click.Option( + ["--class-name"], + default=None, + type=str, + help="Name of the class to push the records into, e.g: Pdf-elements", + ), + click.Option( + ["--auth-keys"], + required=False, + type=Dict(), + help=( + "String representing a JSON-like dict with key,value containing " + "the required parameters to create an authentication object. " + "The connector resolves th authentication object from the parameters. " + "See https://weaviate.io/developers/weaviate/client-libraries/python_v3" + "#api-key-authentication " + "for more information." + ), + ), + ] + return options + + +@dataclass +class WeaviateCliWriteConfig(WeaviateWriteConfig, CliConfig): + @staticmethod + def get_cli_options() -> t.List[click.Option]: + options = [ + click.Option( + ["--batch-size"], + default=100, + type=int, + help="Number of records per batch", + ) + ] + return options + + +def get_base_dest_cmd(): + from unstructured.ingest.cli.base.dest import BaseDestCmd + + cmd_cls = BaseDestCmd( + cmd_name=CMD_NAME, + cli_config=WeaviateCliConfig, + additional_cli_options=[WeaviateCliWriteConfig], + ) + return cmd_cls diff --git a/unstructured/ingest/connector/weaviate.py b/unstructured/ingest/connector/weaviate.py new file mode 100644 index 0000000000..2287dab638 --- /dev/null +++ b/unstructured/ingest/connector/weaviate.py @@ -0,0 +1,175 @@ +import json +import typing as t +from dataclasses import dataclass, field + +from unstructured.ingest.error import DestinationConnectionError, SourceConnectionError +from unstructured.ingest.interfaces import ( + BaseConnectorConfig, + BaseDestinationConnector, + BaseIngestDoc, + WriteConfig, +) +from unstructured.ingest.logger import logger +from unstructured.utils import requires_dependencies + +if t.TYPE_CHECKING: + from weaviate import Client + + +@dataclass +class SimpleWeaviateConfig(BaseConnectorConfig): + host_url: str + class_name: str + auth_keys: t.Optional[t.Dict[str, str]] = None + + +@dataclass +class WeaviateWriteConfig(WriteConfig): + batch_size: int = 100 + + +@dataclass +class WeaviateDestinationConnector(BaseDestinationConnector): + write_config: WeaviateWriteConfig + connector_config: SimpleWeaviateConfig + _client: t.Optional["Client"] = field(init=False, default=None) + + @property + @requires_dependencies(["weaviate"], extras="weaviate") + def client(self) -> "Client": + if self._client is None: + from weaviate import Client + + auth = self._resolve_auth_method() + self._client = Client(url=self.connector_config.host_url, auth_client_secret=auth) + return self._client + + @requires_dependencies(["weaviate"], extras="weaviate") + @DestinationConnectionError.wrap + def initialize(self): + _ = self.client + + @requires_dependencies(["weaviate"], extras="weaviate") + def check_connection(self): + try: + _ = self.client + except Exception as e: + logger.error(f"Failed to validate connection {e}", exc_info=True) + raise SourceConnectionError(f"failed to validate connection: {e}") + + def _resolve_auth_method(self): + if self.connector_config.auth_keys is None: + return None + + if access_token := self.connector_config.auth_keys.get("access_token"): + from weaviate.auth import AuthBearerToken + + return AuthBearerToken( + access_token=access_token, + refresh_token=self.connector_config.auth_keys.get("refresh_token"), + ) + elif api_key := self.connector_config.auth_keys.get("api_key"): + from weaviate.auth import AuthApiKey + + return AuthApiKey(api_key=api_key) + elif client_secret := self.connector_config.auth_keys.get("client_secret"): + from weaviate.auth import AuthClientCredentials + + return AuthClientCredentials( + client_secret=client_secret, scope=self.connector_config.auth_keys.get("scope") + ) + elif (username := self.connector_config.auth_keys.get("username")) and ( + pwd := self.connector_config.auth_keys.get("password") + ): + from weaviate.auth import AuthClientPassword + + return AuthClientPassword( + username=username, password=pwd, scope=self.connector_config.auth_keys.get("scope") + ) + return None + + def conform_dict(self, data: dict) -> None: + """ + Updates the element dictionary to conform to the Weaviate schema + """ + from dateutil import parser + + # Dict as string formatting + if record_locator := data.get("metadata", {}).get("data_source", {}).get("record_locator"): + # Explicit casting otherwise fails schema type checking + data["metadata"]["data_source"]["record_locator"] = str(json.dumps(record_locator)) + + # Array of items as string formatting + if points := data.get("metadata", {}).get("coordinates", {}).get("points"): + data["metadata"]["coordinates"]["points"] = str(json.dumps(points)) + + if links := data.get("metadata", {}).get("links", {}): + data["metadata"]["links"] = str(json.dumps(links)) + + if permissions_data := ( + data.get("metadata", {}).get("data_source", {}).get("permissions_data") + ): + data["metadata"]["data_source"]["permissions_data"] = json.dumps(permissions_data) + + # Datetime formatting + if date_created := data.get("metadata", {}).get("data_source", {}).get("date_created"): + data["metadata"]["data_source"]["date_created"] = parser.parse(date_created).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) + + if date_modified := data.get("metadata", {}).get("data_source", {}).get("date_modified"): + data["metadata"]["data_source"]["date_modified"] = parser.parse(date_modified).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) + + if date_processed := data.get("metadata", {}).get("data_source", {}).get("date_processed"): + data["metadata"]["data_source"]["date_processed"] = parser.parse( + date_processed + ).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) + + if last_modified := data.get("metadata", {}).get("last_modified", {}): + data["metadata"]["last_modified"] = parser.parse(last_modified).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) + + # String casting + if version := data.get("metadata", {}).get("data_source", {}).get("version"): + data["metadata"]["data_source"]["version"] = str(version) + + if page_number := data.get("metadata", {}).get("page_number"): + data["metadata"]["page_number"] = str(page_number) + + if regex_metadata := data.get("metadata", {}).get("regex_metadata"): + data["metadata"]["regex_metadata"] = str(json.dumps(regex_metadata)) + + def write_dict(self, *args, json_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None: + logger.info( + f"writing {len(json_list)} objects to destination " + f"class {self.connector_config.class_name} " + f"at {self.connector_config.host_url}", + ) + self.client.batch.configure(batch_size=self.write_config.batch_size) + with self.client.batch as b: + for e in json_list: + self.conform_dict(e) + vector = e.pop("embeddings", None) + b.add_data_object( + e, + self.connector_config.class_name, + vector=vector, + ) + + @requires_dependencies(["weaviate"], extras="weaviate") + def write(self, docs: t.List[BaseIngestDoc]) -> None: + json_list: t.List[t.Dict[str, t.Any]] = [] + for doc in docs: + local_path = doc._output_filename + with open(local_path) as json_file: + json_content = json.load(json_file) + logger.info( + f"appending {len(json_content)} json elements from content in {local_path}", + ) + json_list.extend(json_content) + self.write_dict(json_list=json_list) diff --git a/unstructured/ingest/runner/writers/__init__.py b/unstructured/ingest/runner/writers/__init__.py index e8df868642..52905b92a9 100644 --- a/unstructured/ingest/runner/writers/__init__.py +++ b/unstructured/ingest/runner/writers/__init__.py @@ -9,6 +9,7 @@ from .mongodb import mongodb_writer from .pinecone import pinecone_writer from .s3 import s3_writer +from .weaviate import weaviate_writer writer_map: t.Dict[str, t.Callable] = { "azure": azure_writer, @@ -19,6 +20,7 @@ "gcs": gcs_writer, "mongodb": mongodb_writer, "s3": s3_writer, + "weaviate": weaviate_writer, "pinecone": pinecone_writer, } diff --git a/unstructured/ingest/runner/writers/weaviate.py b/unstructured/ingest/runner/writers/weaviate.py new file mode 100644 index 0000000000..39dc242354 --- /dev/null +++ b/unstructured/ingest/runner/writers/weaviate.py @@ -0,0 +1,27 @@ +import typing as t + +from unstructured.utils import requires_dependencies + + +@requires_dependencies(["weaviate"], extras="weaviate") +def weaviate_writer( + host_url: str, + class_name: str, + batch_size: int = 100, + auth_keys: t.Optional[t.List[str]] = None, + **kwargs, +): + from unstructured.ingest.connector.weaviate import ( + SimpleWeaviateConfig, + WeaviateDestinationConnector, + WeaviateWriteConfig, + ) + + return WeaviateDestinationConnector( + write_config=WeaviateWriteConfig(batch_size=batch_size), + connector_config=SimpleWeaviateConfig( + host_url=host_url, + class_name=class_name, + auth_keys=auth_keys, + ), + )