Skip to content

Commit

Permalink
feat: add v2 pinecone destination connector (#3286)
Browse files Browse the repository at this point in the history
This PR adds a V2 version of the Pinecone destination connector
  • Loading branch information
ahmetmeleq authored Jul 1, 2024
1 parent a18b21c commit 72f28d7
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 12 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.14.10-dev2
## 0.14.10-dev3

### Enhancements

Expand Down
7 changes: 4 additions & 3 deletions test_unstructured_ingest/dest/pinecone.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RANDOM_SUFFIX=$((RANDOM % 100000 + 1))

# Set the variables with default values if they're not set in the environment
PINECONE_INDEX=${PINECONE_INDEX:-"ingest-test-$RANDOM_SUFFIX"}
PINECONE_HOST_POSTFIX=${PINECONE_HOST_POSTFIX:-"4627-b74a"}
PINECONE_ENVIRONMENT=${PINECONE_ENVIRONMENT:-"us-east1-gcp"}
PINECONE_PROJECT_ID=${PINECONE_PROJECT_ID:-"art8iaj"}

Expand Down Expand Up @@ -96,7 +97,7 @@ PYTHONPATH=. ./unstructured/ingest/main.py \
--input-path example-docs/book-war-and-peace-1225p.txt \
--work-dir "$WORK_DIR" \
--chunking-strategy by_title \
--chunk-combine-text-under-n-chars 200 --chunk-new-after-n-chars 2500 --chunk-max-characters 38000 --chunk-multipage-sections \
--chunk-combine-text-under-n-chars 150 --chunk-new-after-n-chars 1500 --chunk-max-characters 2500 --chunk-multipage-sections \
--embedding-provider "langchain-huggingface" \
pinecone \
--api-key "$PINECONE_API_KEY" \
Expand All @@ -116,7 +117,7 @@ while [ "$num_of_vectors_remote" -eq 0 ] && [ "$attempt" -lt 4 ]; do

num_of_vectors_remote=$(curl --request POST \
-s \
--url "https://$PINECONE_INDEX-$PINECONE_PROJECT_ID.svc.$PINECONE_ENVIRONMENT.pinecone.io/describe_index_stats" \
--url "https://$PINECONE_INDEX-$PINECONE_PROJECT_ID.svc.aped-$PINECONE_HOST_POSTFIX.pinecone.io/describe_index_stats" \
--header "accept: application/json" \
--header "content-type: application/json" \
--header "Api-Key: $PINECONE_API_KEY" | jq -r '.totalVectorCount')
Expand All @@ -125,7 +126,7 @@ while [ "$num_of_vectors_remote" -eq 0 ] && [ "$attempt" -lt 4 ]; do
attempt=$((attempt + 1))
done

EXPECTED=1404
EXPECTED=1825

if [ "$num_of_vectors_remote" -ne $EXPECTED ]; then
echo "Number of vectors in Pinecone are $num_of_vectors_remote when the expected number is $EXPECTED. Test failed."
Expand Down
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.14.10-dev2" # pragma: no cover
__version__ = "0.14.10-dev3" # pragma: no cover
2 changes: 2 additions & 0 deletions unstructured/ingest/v2/cli/cmds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .local import local_dest_cmd, local_src_cmd
from .onedrive import onedrive_drive_src_cmd
from .opensearch import opensearch_dest_cmd, opensearch_src_cmd
from .pinecone import pinecone_dest_cmd
from .weaviate import weaviate_dest_cmd

src_cmds = [
Expand Down Expand Up @@ -50,6 +51,7 @@
gcs_dest_cmd,
local_dest_cmd,
opensearch_dest_cmd,
pinecone_dest_cmd,
s3_dest_cmd,
sftp_dest_cmd,
weaviate_dest_cmd,
Expand Down
62 changes: 62 additions & 0 deletions unstructured/ingest/v2/cli/cmds/pinecone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from dataclasses import dataclass

import click

from unstructured.ingest.v2.cli.base import DestCmd
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.processes.connectors.pinecone import CONNECTOR_TYPE


@dataclass
class PineconeCliConnectionConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--api-key"],
required=True,
type=str,
help="API key for Pinecone.",
),
click.Option(
["--index-name"],
required=True,
type=str,
help="Name of the index to connect to. Example: my-index",
),
click.Option(
["--environment"],
required=True,
type=str,
help="Environment to connect to. Example: us-east-1",
),
]
return options


@dataclass
class PineconeCliUploaderConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--batch-size"],
default=100,
type=int,
help="Number of records per batch",
),
click.Option(
["--num-processes"],
default=4,
type=int,
help="Number of processes to use for uploading",
),
]
return options


pinecone_dest_cmd = DestCmd(
cmd_name=CONNECTOR_TYPE,
connection_config=PineconeCliConnectionConfig,
uploader_config=PineconeCliUploaderConfig,
)
56 changes: 56 additions & 0 deletions unstructured/ingest/v2/examples/example_pinecone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os
from pathlib import Path

from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
)
from unstructured.ingest.v2.processes.connectors.pinecone import (
PineconeAccessConfig,
PineconeConnectionConfig,
PineconeUploaderConfig,
PineconeUploadStagerConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig

base_path = Path(__file__).parent.parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest"
output_path = work_dir / "output"
download_path = work_dir / "download"

if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=LocalIndexerConfig(
input_path=str(docs_path.resolve()) + "/book-war-and-peace-1p.txt"
),
downloader_config=LocalDownloaderConfig(download_dir=download_path),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"),
destination_connection_config=PineconeConnectionConfig(
# You'll need to set PINECONE_API_KEY environment variable to run this example
access_config=PineconeAccessConfig(api_key=os.getenv("PINECONE_API_KEY")),
index_name=os.getenv(
"PINECONE_INDEX",
default="your index name here. e.g. my-index,"
"or define in environment variable PINECONE_INDEX",
),
environment=os.getenv(
"PINECONE_ENVIRONMENT",
default="your environment name here. e.g. us-east-1,"
"or define in environment variable PINECONE_ENVIRONMENT",
),
),
stager_config=PineconeUploadStagerConfig(),
uploader_config=PineconeUploaderConfig(batch_size=10, num_of_processes=2),
).run()
10 changes: 3 additions & 7 deletions unstructured/ingest/v2/processes/connectors/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def parse_date_string(date_string: str) -> date:
logger.debug(f"date {date_string} string not a timestamp: {e}")
return parser.parse(date_string)

@classmethod
def conform_dict(cls, data: dict) -> dict:
@staticmethod
def conform_dict(data: dict) -> dict:
"""
Prepares dictionary in the format that Chroma requires
"""
Expand All @@ -96,11 +96,7 @@ def run(
) -> Path:
with open(elements_filepath) as elements_file:
elements_contents = json.load(elements_file)

conformed_elements = []
for element in elements_contents:
conformed_elements.append(self.conform_dict(data=element))

conformed_elements = [self.conform_dict(data=element) for element in elements_contents]
output_path = Path(output_dir) / Path(f"{output_filename}.json")
with open(output_path, "w") as output_file:
json.dump(conformed_elements, output_file)
Expand Down
182 changes: 182 additions & 0 deletions unstructured/ingest/v2/processes/connectors/pinecone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import json
import multiprocessing as mp
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional

from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.error import DestinationConnectionError
from unstructured.ingest.utils.data_prep import chunk_generator
from unstructured.ingest.v2.interfaces import (
AccessConfig,
ConnectionConfig,
UploadContent,
Uploader,
UploaderConfig,
UploadStager,
UploadStagerConfig,
)
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.processes.connector_registry import (
DestinationRegistryEntry,
add_destination_entry,
)
from unstructured.staging.base import flatten_dict
from unstructured.utils import requires_dependencies

if TYPE_CHECKING:
from pinecone import Index as PineconeIndex


CONNECTOR_TYPE = "pinecone"


@dataclass
class PineconeAccessConfig(AccessConfig):
api_key: Optional[str] = enhanced_field(default=None, overload_name="pinecone_api_key")


@dataclass
class PineconeConnectionConfig(ConnectionConfig):
index_name: str
environment: str
access_config: PineconeAccessConfig = enhanced_field(sensitive=True)

@requires_dependencies(["pinecone"], extras="pinecone")
def get_index(self) -> "PineconeIndex":
from pinecone import Pinecone

from unstructured import __version__ as unstructured_version

pc = Pinecone(
api_key=self.access_config.api_key,
source_tag=f"unstructured=={unstructured_version}",
)

index = pc.Index(self.index_name)
logger.debug(f"Connected to index: {pc.describe_index(self.index_name)}")
return index


@dataclass
class PineconeUploadStagerConfig(UploadStagerConfig):
pass


@dataclass
class PineconeUploaderConfig(UploaderConfig):
batch_size: int = 100
num_of_processes: int = 4


@dataclass
class PineconeUploadStager(UploadStager):
upload_stager_config: PineconeUploadStagerConfig = field(
default_factory=lambda: PineconeUploadStagerConfig()
)

@staticmethod
def conform_dict(element_dict: dict) -> dict:
# While flatten_dict enables indexing on various fields,
# element_serialized enables easily reloading the element object to memory.
# element_serialized is formed without text/embeddings to avoid data bloating.
return {
"id": str(uuid.uuid4()),
"values": element_dict.pop("embeddings", None),
"metadata": {
"text": element_dict.pop("text", None),
"element_serialized": json.dumps(element_dict),
**flatten_dict(
element_dict,
separator="-",
flatten_lists=True,
remove_none=True,
),
},
}

def run(
self,
elements_filepath: Path,
output_dir: Path,
output_filename: str,
**kwargs: Any,
) -> Path:
with open(elements_filepath) as elements_file:
elements_contents = json.load(elements_file)

conformed_elements = [
self.conform_dict(element_dict=element) for element in elements_contents
]

output_path = Path(output_dir) / Path(f"{output_filename}.json")
output_path.parent.mkdir(parents=True, exist_ok=True)

with open(output_path, "w") as output_file:
json.dump(conformed_elements, output_file)
return output_path


@dataclass
class PineconeUploader(Uploader):
upload_config: PineconeUploaderConfig
connection_config: PineconeConnectionConfig
connector_type: str = CONNECTOR_TYPE

@DestinationConnectionError.wrap
def check_connection(self):
_ = self.connection_config.get_index()

@requires_dependencies(["pinecone"], extras="pinecone")
def upsert_batch(self, batch):
from pinecone.core.client.exceptions import PineconeApiException

try:
index = self.connection_config.get_index()
response = index.upsert(batch)
except PineconeApiException as api_error:
raise DestinationConnectionError(f"http error: {api_error}") from api_error
logger.debug(f"results: {response}")

def run(self, contents: list[UploadContent], **kwargs: Any) -> None:

elements_dict = []
for content in contents:
with open(content.path) as elements_file:
elements = json.load(elements_file)
elements_dict.extend(elements)

logger.info(
f"writing document batches to destination"
f" index named {self.connection_config.index_name}"
f" environment named {self.connection_config.environment}"
f" with batch size {self.upload_config.batch_size}"
f" with {self.upload_config.num_of_processes} (number of) processes"
)

pinecone_batch_size = self.upload_config.batch_size

if self.upload_config.num_of_processes == 1:
for chunk in chunk_generator(elements_dict, pinecone_batch_size):
self.upsert_batch(chunk) # noqa: E203

else:
with mp.Pool(
processes=self.upload_config.num_of_processes,
) as pool:
pool.map(
self.upsert_batch, list(chunk_generator(elements_dict, pinecone_batch_size))
)


add_destination_entry(
destination_type=CONNECTOR_TYPE,
entry=DestinationRegistryEntry(
connection_config=PineconeConnectionConfig,
uploader=PineconeUploader,
uploader_config=PineconeUploaderConfig,
upload_stager=PineconeUploadStager,
upload_stager_config=PineconeUploadStagerConfig,
),
)

0 comments on commit 72f28d7

Please sign in to comment.