Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add component to index aws opensearch #740

Merged
merged 14 commits into from
Jan 3, 2024
Merged
23 changes: 23 additions & 0 deletions components/index_aws_opensearch/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM --platform=linux/amd64 python:3.8-slim as base

# System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# Install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
WORKDIR /component
COPY src/ src/

FROM base
WORKDIR /component/src
ENTRYPOINT ["fondant", "execute", "main"]
55 changes: 55 additions & 0 deletions components/index_aws_opensearch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Index AWS OpenSearch

### Description
Component that takes embeddings of text snippets and indexes them into AWS OpenSearch vector database.

### Inputs / outputs

**This component consumes:**

- text: string
- embedding: list<item: float>

**This component produces no data.**

### Arguments

The component takes the following arguments to alter its behavior:

| argument | type | description | default |
| -------- | ---- | ----------- | ------- |
| host | str | The Cluster endpoint of the AWS OpenSearch cluster where the embeddings will be indexed. For example, "my-opensearch-cluster.us-east-1.es.amazonaws.com" | / |
| region | str | The AWS region where the OpenSearch cluster is located. If not specified, the default region will be used. | / |
| index_name | str | The name of the index in the AWS OpenSearch cluster where the embeddings will be stored. | / |
| index_body | dict | A dictionary representing the body of the index request. This can include additional settings for the index operation. | / |
| port | int | The port number to connect to the AWS OpenSearch cluster. | 443 |
| use_ssl | bool | A boolean flag indicating whether to use SSL/TLS for the connection to the OpenSearch cluster. | True |
| verify_certs | bool | A boolean flag indicating whether to verify SSL certificates when connecting to the OpenSearch cluster. | True |
| pool_maxsize | int | The maximum size of the connection pool to the AWS OpenSearch cluster. | 20 |

### Usage

You can add this component to your pipeline using the following code:

```python
from fondant.pipeline import Pipeline

pipeline = Pipeline(...)

dataset = pipeline.read(...)

dataset = dataset.apply(...)

dataset.write(
"index_aws_opensearch",
arguments={
# Add arguments
# "host": "my-opensearch-cluster.us-east-1.es.amazonaws.com",
# "region": "eu-west-1",
# "index_name": "test-index",
# "port": 443,
# "use_ssl": True,
# "verify_certs": True,
# "pool_maxsize": 20,
}
)
44 changes: 44 additions & 0 deletions components/index_aws_opensearch/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Index AWS OpenSearch
description: Component that takes embeddings of text snippets and indexes them into AWS OpenSearch vector database.
image: fndnt/index_aws_opensearch:dev
tags:
- Data writing

consumes:
text:
type: string
embedding:
type: array
items:
type: float32

args:
host:
description: The Cluster endpoint of the AWS OpenSearch cluster where the embeddings will be indexed. E.g. "my-test-domain.us-east-1.aoss.amazonaws.com"
type: str
region:
description: The AWS region where the OpenSearch cluster is located. If not specified, the default region will be used.
type: str
index_name:
description: The name of the index in the AWS OpenSearch cluster where the embeddings will be stored.
type: str
index_body:
description: Parameters that specify index settings, mappings, and aliases for newly created index.
type: dict
port:
description: The port number to connect to the AWS OpenSearch cluster.
type: int
default: 443
use_ssl:
description: A boolean flag indicating whether to use SSL/TLS for the connection to the OpenSearch cluster.
type: bool
default: True
verify_certs:
description: A boolean flag indicating whether to verify SSL certificates when connecting to the OpenSearch cluster.
type: bool
default: True
pool_maxsize:
description: The maximum size of the connection pool to the AWS OpenSearch cluster.
type: int
default: 20

2 changes: 2 additions & 0 deletions components/index_aws_opensearch/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3==1.34.4
PhilippeMoussalli marked this conversation as resolved.
Show resolved Hide resolved
opensearch-py==2.4.2
61 changes: 61 additions & 0 deletions components/index_aws_opensearch/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Dict, Any
import dask.dataframe as dd
from fondant.component import DaskWriteComponent
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

class IndexAWSOpenSearchComponent(DaskWriteComponent):
def __init__(
self,
host: str,
region: str,
index_name: str,
index_body: Dict[str, Any],
port: int = 443,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not include the default types here for the optional argument since you already define them in the spec and it can be misleading (they are not actually used here it will always default to the ones defined in the spec). You can instead define their types as optional just so that it's clear that they do have default values and they don't need to be explicitly defined. Similar to here

use_ssl: bool = True,
verify_certs: bool = True,
pool_maxsize: int = 20,
**kwargs,
):
session = boto3.Session()
credentials = session.get_credentials()
auth = AWSV4SignerAuth(credentials, region)
self.index_name = index_name
self.client = OpenSearch(
hosts=[{"host": host, "port": port}],
http_auth=auth,
use_ssl=use_ssl,
verify_certs=verify_certs,
connection_class=RequestsHttpConnection,
pool_maxsize=pool_maxsize,
**kwargs,
)
self.create_index(index_body)

def create_index(self, index_body: Dict[str, Any]):
"""Creates an index in AWS OpenSearch

Args:
index_body (Dict[str, Any]): Parameters that specify index settings, mappings, and aliases for newly created index.
"""
response = self.client.indices.create(self.index_name, body=index_body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the index already exists? Will the index be overwritten? Maybe we add a check if an index exists.

Copy link
Contributor Author

@shub-kris shub-kris Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check what it does if an index already exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :)

I think you still need to modify the Docker image to be able to run the test with it similar to this so that you can run tests using

docker build . --target test

Could you please update and test?

Copy link
Contributor Author

@shub-kris shub-kris Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. I almost missed it. But now, Updated and tested

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great :)

did you manage to fix the failing pipelines?

after installing pre-commit you should run

pre-commit run --all-files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yupp


def write(self, dataframe: dd.DataFrame):
"""
Writes the data from the given Dask DataFrame to AWS OpenSearch Index.
Args:
dataframe (dd.DataFrame): The Dask DataFrame containing the data to be written.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dataframe (dd.DataFrame): The Dask DataFrame containing the data to be written.
dataframe: The Dask DataFrame containing the data to be written.

"""
if not self.client.indices.exists(index=self.index_name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could call the create_index here if the index doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can do that

Copy link
Contributor Author

@shub-kris shub-kris Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed anymore as I am doing it inside the __init__ function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the Exception too

raise ValueError(f"Index: {self.index_name} doesn't exist. Please Create")

for part in dataframe.partitions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for part in dataframe.partitions:
for part in tqdm(
dataframe.partitions,
desc="Processing partitions",
total=dataframe.npartitions,
):

Useful to add logs in case the dataset is large. Similar to https://github.com/ml6team/fondant/blob/main/components/index_weaviate/src/main.py

Don't forget to add tqdm to the list of requirements

df = part.compute()
for row in df.itertuples():
body = {
"embedding": row.embedding,
"text": row.text
}
response = self.client.index(
index=self.index_name, id=str(row.Index), body=body
)
Loading