From bc5ff2e1510a7f8fbb5f890b824269f878625157 Mon Sep 17 00:00:00 2001 From: Anush008 <46051506+Anush008@users.noreply.github.com> Date: Thu, 16 Nov 2023 23:28:50 +0530 Subject: [PATCH 1/7] feat: Qdrant support --- components/index_qdrant/Dockerfile | 23 ++++++ components/index_qdrant/README.md | 59 ++++++++++++++ .../index_qdrant/fondant_component.yaml | 79 +++++++++++++++++++ components/index_qdrant/requirements.txt | 1 + components/index_qdrant/src/main.py | 65 +++++++++++++++ 5 files changed, 227 insertions(+) create mode 100644 components/index_qdrant/Dockerfile create mode 100644 components/index_qdrant/README.md create mode 100644 components/index_qdrant/fondant_component.yaml create mode 100644 components/index_qdrant/requirements.txt create mode 100644 components/index_qdrant/src/main.py diff --git a/components/index_qdrant/Dockerfile b/components/index_qdrant/Dockerfile new file mode 100644 index 000000000..41aa27b5c --- /dev/null +++ b/components/index_qdrant/Dockerfile @@ -0,0 +1,23 @@ +FROM --platform=linux/amd64 python:3.8-slim as base + +# System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# Install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Install Fondant +# This is split from other requirements to leverage caching +ARG FONDANT_VERSION=main +RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} + +# Set the working directory to the component folder +WORKDIR /component/src + +# Copy over src-files +COPY src/ . + +ENTRYPOINT ["fondant", "execute", "main"] \ No newline at end of file diff --git a/components/index_qdrant/README.md b/components/index_qdrant/README.md new file mode 100644 index 000000000..f54f23cab --- /dev/null +++ b/components/index_qdrant/README.md @@ -0,0 +1,59 @@ +# Index Qdrant + +### Description +A Fondant component to load textual data and embeddings into a [Qdrant](https://qdrant.tech/) database. + +### Inputs / outputs + +**This component consumes:** + +- text + - data: string + - embedding: list + +**This component produces no data.** + +> [!IMPORTANT] +> A Qdrant collection has to created in advance with appropriate vector configurations. Find out how to [here](https://qdrant.tech/documentation/concepts/collections/). + +### Arguments + +The component takes the following arguments to alter its behavior: + +| argument | type | description | default | +| -------- | ---- | ----------- | ------- | +| collection_name | str | The name of the Qdrant collection to upsert data into. | / | +| location | str | If `:memory:` - use in-memory Qdrant instance else use it as a url parameter. | None | +| batch_size | int | The batch size to use when uploading points to Qdrant. | 100 | +| parallelism | int | The number of parallel workers to use when uploading points to Qdrant. | None | +| url | str | Either host or str of 'Optional[scheme], host, Optional[port], Optional[prefix]'. Eg. `http://localhost:6333` | None | +| port | int | Port of the REST API interface.| 6333 | +| grpc_port | str | Port of the gRPC interface. | 6334 | +| prefer_grpc | bool | If `true` - use gRPC interface whenever possible in custom methods. | False | +| https | bool | If `true` - use HTTPS(SSL) protocol. | False | +| api_key | str | API key for authentication in Qdrant Cloud. | None | +| prefix | str | If set, add `prefix` to the REST URL path. Example: `service/v1` will result in `http://localhost:6333/service/v1/{qdrant-endpoint}` for REST API. | None | +| timeout | int | Timeout for REST and gRPC API requests. | 5 for REST, Unlimited for GRPC | +| host | str | Host name of Qdrant service. If url and host are not set, defaults to 'localhost'. | None | +| path | str | Persistence path for QdrantLocal. Eg. `local_data/qdrant` | None | +| force_disable_check_same_thread | bool | Force disable check_same_thread for QdrantLocal sqlite connection. | False | + + +### Usage + +You can add this component to your pipeline using the following code: + +```python +from fondant.pipeline import ComponentOp + +index_qdrant_op = ComponentOp.from_registry( + name="index_qdrant", + # Add arguments + arguments={ + "collection_name": "fondant_loaded_data", + # "location": "http://localhost:6333", + # "batch_size": 100, + } +) +pipeline.add_op(index_qdrant_op, dependencies=[...]) +``` \ No newline at end of file diff --git a/components/index_qdrant/fondant_component.yaml b/components/index_qdrant/fondant_component.yaml new file mode 100644 index 000000000..9c71ef84f --- /dev/null +++ b/components/index_qdrant/fondant_component.yaml @@ -0,0 +1,79 @@ +name: Index Qdrant +description: A Fondant component to load textual data and embeddings into a Qdrant database. +image: fndnt/index_qdrant:dev +tags: + - Data writing + +consumes: + text: + fields: + data: + type: string + embedding: + type: array + items: + type: float32 + +args: + collection_name: + description: The name of the Qdrant collection to upsert data into. + type: str + location: + description: The location of the Qdrant instance. + type: str + default: None + batch_size: + description: The batch size to use when uploading points to Qdrant. + type: int + default: 64 + parallelism: + description: The number of parallel workers to use when uploading points to Qdrant. + type: int + default: 1 + url: + description: Either host or str of 'Optional[scheme], host, Optional[port], Optional[prefix]'. + type: str + default: None + port: + description: Port of the REST API interface. + type: int + default: 6333 + grpc_port: + description: Port of the gRPC interface. + type: int + default: 6334 + prefer_grpc: + description: If `true` - use gRPC interface whenever possible in custom methods. + type: bool + default: False + https: + description: If `true` - use HTTPS(SSL) protocol. + type: bool + default: False + api_key: + description: API key for authentication in Qdrant Cloud. + type: str + default: None + prefix: + description: If set, add `prefix` to the REST URL path. + type: str + default: None + timeout: + description: Timeout for API requests. + type: int + default: None + host: + description: Host name of Qdrant service. If url and host are not set, defaults to 'localhost'. + type: str + default: None + path: + description: Persistence path for QdrantLocal. Eg. `local_data/qdrant` + type: str + default: None + force_disable_check_same_thread + description: Force disable check_same_thread for QdrantLocal sqlite connection. + type: bool + default: False + + + \ No newline at end of file diff --git a/components/index_qdrant/requirements.txt b/components/index_qdrant/requirements.txt new file mode 100644 index 000000000..05fcad6b2 --- /dev/null +++ b/components/index_qdrant/requirements.txt @@ -0,0 +1 @@ +qdrant_client==1.6.9 \ No newline at end of file diff --git a/components/index_qdrant/src/main.py b/components/index_qdrant/src/main.py new file mode 100644 index 000000000..363d552fd --- /dev/null +++ b/components/index_qdrant/src/main.py @@ -0,0 +1,65 @@ +from typing import List, Optional + +import dask.dataframe as dd +from fondant.component import DaskWriteComponent +from qdrant_client import QdrantClient, models +from qdrant_client.qdrant_fastembed import uuid + + +class IndexQdrantComponent(DaskWriteComponent): + def __init__( + self, + *_, + collection_name: str, + location: Optional[str] = None, + batch_size: int = 64, + parallelism: int = 1, + url: Optional[str] = None, + port: Optional[int] = 6333, + grpc_port: int = 6334, + prefer_grpc: bool = False, + https: Optional[bool] = None, + api_key: Optional[str] = None, + prefix: Optional[str] = None, + timeout: Optional[float] = None, + host: Optional[str] = None, + path: Optional[str] = None, + force_disable_check_same_thread: bool = False, + ): + self.client = QdrantClient( + location=location, + url=url, + port=port, + grpc_port=grpc_port, + prefer_grpc=prefer_grpc, + https=https, + api_key=api_key, + prefix=prefix, + timeout=timeout, + host=host, + path=path, + force_disable_check_same_thread=force_disable_check_same_thread, + ) + self.collection_name = collection_name + self.batch_size = batch_size + self.parallelism = parallelism + + def write(self, dataframe: dd.DataFrame) -> None: + records: List[models.Record] = [] + for part in dataframe.partitions: + df = part.compute() + for row in df.itertuples(): + payload = { + "id_": str(row.Index), + "passage": row.text_data, + } + id = str(uuid.uuid4()) + embedding = row.text_embedding + records.append(models.Record(id=id, payload=payload, vector=embedding)) + + self.client.upload_records( + collection_name=self.collection_name, + records=records, + batch_size=self.batch_size, + parallel=self.parallelism, + ) From b3e8ffa3c30c8bb1ce06e22bf630ca88a21dc063 Mon Sep 17 00:00:00 2001 From: Anush Date: Thu, 16 Nov 2023 23:50:18 +0530 Subject: [PATCH 2/7] chore: format fondant_component.yaml --- .../index_qdrant/fondant_component.yaml | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/components/index_qdrant/fondant_component.yaml b/components/index_qdrant/fondant_component.yaml index 9c71ef84f..c76fc9018 100644 --- a/components/index_qdrant/fondant_component.yaml +++ b/components/index_qdrant/fondant_component.yaml @@ -1,9 +1,10 @@ name: Index Qdrant -description: A Fondant component to load textual data and embeddings into a Qdrant database. -image: fndnt/index_qdrant:dev +description: >- + A Fondant component to load textual data and embeddings into a Qdrant + database. +image: 'fndnt/index_qdrant:dev' tags: - Data writing - consumes: text: fields: @@ -13,7 +14,6 @@ consumes: type: array items: type: float32 - args: collection_name: description: The name of the Qdrant collection to upsert data into. @@ -31,7 +31,9 @@ args: type: int default: 1 url: - description: Either host or str of 'Optional[scheme], host, Optional[port], Optional[prefix]'. + description: >- + Either host or str of 'Optional[scheme], host, Optional[port], + Optional[prefix]'. type: str default: None port: @@ -45,35 +47,34 @@ args: prefer_grpc: description: If `true` - use gRPC interface whenever possible in custom methods. type: bool - default: False + default: false https: description: If `true` - use HTTPS(SSL) protocol. type: bool - default: False + default: false api_key: description: API key for authentication in Qdrant Cloud. type: str default: None prefix: - description: If set, add `prefix` to the REST URL path. + description: 'If set, add `prefix` to the REST URL path.' type: str default: None timeout: description: Timeout for API requests. type: int default: None - host: - description: Host name of Qdrant service. If url and host are not set, defaults to 'localhost'. + host: + description: >- + Host name of Qdrant service. If url and host are not set, defaults to + 'localhost'. type: str default: None path: description: Persistence path for QdrantLocal. Eg. `local_data/qdrant` type: str default: None - force_disable_check_same_thread + force_disable_check_same_thread: description: Force disable check_same_thread for QdrantLocal sqlite connection. type: bool - default: False - - - \ No newline at end of file + default: false From 8a6786e337a43fddfba6e00de02035094d714865 Mon Sep 17 00:00:00 2001 From: Anush008 <46051506+Anush008@users.noreply.github.com> Date: Sun, 19 Nov 2023 09:28:37 +0530 Subject: [PATCH 3/7] test: index_qdrant test --- components/index_qdrant/Dockerfile | 13 +++-- .../index_qdrant/fondant_component.yaml | 10 ++-- components/index_qdrant/src/main.py | 52 +++++++++++++------ components/index_qdrant/test_requirements.txt | 1 + components/index_qdrant/tests/qdrant_test.py | 41 +++++++++++++++ 5 files changed, 95 insertions(+), 22 deletions(-) create mode 100644 components/index_qdrant/test_requirements.txt create mode 100644 components/index_qdrant/tests/qdrant_test.py diff --git a/components/index_qdrant/Dockerfile b/components/index_qdrant/Dockerfile index 41aa27b5c..35e7cc91f 100644 --- a/components/index_qdrant/Dockerfile +++ b/components/index_qdrant/Dockerfile @@ -15,9 +15,16 @@ ARG FONDANT_VERSION=main RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} # Set the working directory to the component folder -WORKDIR /component/src +WORKDIR /component +COPY src/ src/ +ENV PYTHONPATH "${PYTHONPATH}:./src" -# Copy over src-files -COPY src/ . +FROM base as test +COPY test_requirements.txt . +RUN pip3 install --no-cache-dir -r test_requirements.txt +COPY tests/ tests/ +RUN python -m pytest tests +FROM base +WORKDIR /component/src ENTRYPOINT ["fondant", "execute", "main"] \ No newline at end of file diff --git a/components/index_qdrant/fondant_component.yaml b/components/index_qdrant/fondant_component.yaml index c76fc9018..b3013b2f7 100644 --- a/components/index_qdrant/fondant_component.yaml +++ b/components/index_qdrant/fondant_component.yaml @@ -47,11 +47,11 @@ args: prefer_grpc: description: If `true` - use gRPC interface whenever possible in custom methods. type: bool - default: false + default: False https: description: If `true` - use HTTPS(SSL) protocol. type: bool - default: false + default: False api_key: description: API key for authentication in Qdrant Cloud. type: str @@ -77,4 +77,8 @@ args: force_disable_check_same_thread: description: Force disable check_same_thread for QdrantLocal sqlite connection. type: bool - default: false + default: False + client: + description: Optional Qdrant client instance to use. + type: object + default: None \ No newline at end of file diff --git a/components/index_qdrant/src/main.py b/components/index_qdrant/src/main.py index 363d552fd..aca9a0d1c 100644 --- a/components/index_qdrant/src/main.py +++ b/components/index_qdrant/src/main.py @@ -1,3 +1,4 @@ +import ast from typing import List, Optional import dask.dataframe as dd @@ -7,7 +8,7 @@ class IndexQdrantComponent(DaskWriteComponent): - def __init__( + def __init__( # noqa self, *_, collection_name: str, @@ -25,26 +26,37 @@ def __init__( host: Optional[str] = None, path: Optional[str] = None, force_disable_check_same_thread: bool = False, + client: Optional[QdrantClient] = None, ): - self.client = QdrantClient( - location=location, - url=url, - port=port, - grpc_port=grpc_port, - prefer_grpc=prefer_grpc, - https=https, - api_key=api_key, - prefix=prefix, - timeout=timeout, - host=host, - path=path, - force_disable_check_same_thread=force_disable_check_same_thread, - ) + """Initialize the IndexQdrantComponent with the component parameters.""" + if client is None: + self.client = QdrantClient( + location=location, + url=url, + port=port, + grpc_port=grpc_port, + prefer_grpc=prefer_grpc, + https=https, + api_key=api_key, + prefix=prefix, + timeout=timeout, + host=host, + path=path, + force_disable_check_same_thread=force_disable_check_same_thread, + ) + else: + self.client = client self.collection_name = collection_name self.batch_size = batch_size self.parallelism = parallelism def write(self, dataframe: dd.DataFrame) -> None: + """ + Writes the data from the given Dask DataFrame to the Qdrant collection. + + Args: + dataframe (dd.DataFrame): The Dask DataFrame containing the data to be written. + """ records: List[models.Record] = [] for part in dataframe.partitions: df = part.compute() @@ -54,7 +66,14 @@ def write(self, dataframe: dd.DataFrame) -> None: "passage": row.text_data, } id = str(uuid.uuid4()) - embedding = row.text_embedding + # Check if 'text_embedding' attribute is a string. + # If it is, use ast.literal_eval to safely evaluate the string and convert it into a Python list of floats. + # else (i.e., it is already a list), it is directly assigned. + embedding = ( + ast.literal_eval(row.text_embedding) + if isinstance(row.text_embedding, str) + else row.text_embedding + ) records.append(models.Record(id=id, payload=payload, vector=embedding)) self.client.upload_records( @@ -62,4 +81,5 @@ def write(self, dataframe: dd.DataFrame) -> None: records=records, batch_size=self.batch_size, parallel=self.parallelism, + wait=True, ) diff --git a/components/index_qdrant/test_requirements.txt b/components/index_qdrant/test_requirements.txt new file mode 100644 index 000000000..2a929edcc --- /dev/null +++ b/components/index_qdrant/test_requirements.txt @@ -0,0 +1 @@ +pytest==7.4.2 diff --git a/components/index_qdrant/tests/qdrant_test.py b/components/index_qdrant/tests/qdrant_test.py new file mode 100644 index 000000000..542f285ff --- /dev/null +++ b/components/index_qdrant/tests/qdrant_test.py @@ -0,0 +1,41 @@ +import tempfile +import uuid + +import dask.dataframe as dd + +from src.main import IndexQdrantComponent, QdrantClient, models + + +def test_qdrant_write(): + """ + Test case for the write method of the IndexQdrantComponent class. + + This test creates a temporary collection using a QdrantClient. + Writes data to it using the write method of the IndexQdrantComponent. + Asserts that the count of entries in the collection is equal to the expected number of entries. + """ + collection_name = uuid.uuid4().hex + with tempfile.TemporaryDirectory() as tmpdir: + client = QdrantClient(path=str(tmpdir)) + entries = 100 + + client.create_collection( + collection_name=collection_name, + vectors_config=models.VectorParams(distance=models.Distance.COSINE, size=5), + ) + component = IndexQdrantComponent(collection_name=collection_name, client=client) + + dask_dataframe = dd.DataFrame.from_dict( + { + "text_data": [ + "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo", + ] + * entries, + "text_embedding": [[0.1, 0.2, 0.3, 0.4, 0.5]] * entries, + }, + npartitions=1, + ) + + component.write(dask_dataframe) + + assert client.count(collection_name).count == entries From b0725807dad3007171d34d5b41404d2dc9848ed2 Mon Sep 17 00:00:00 2001 From: Anush008 <46051506+Anush008@users.noreply.github.com> Date: Sun, 19 Nov 2023 09:30:55 +0530 Subject: [PATCH 4/7] docs: client param README.md --- components/index_qdrant/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/components/index_qdrant/README.md b/components/index_qdrant/README.md index f54f23cab..04172747f 100644 --- a/components/index_qdrant/README.md +++ b/components/index_qdrant/README.md @@ -37,6 +37,7 @@ The component takes the following arguments to alter its behavior: | host | str | Host name of Qdrant service. If url and host are not set, defaults to 'localhost'. | None | | path | str | Persistence path for QdrantLocal. Eg. `local_data/qdrant` | None | | force_disable_check_same_thread | bool | Force disable check_same_thread for QdrantLocal sqlite connection. | False | +| client | object | Optional Qdrant client instance to use. | None | ### Usage From 7624525f25baaa7d03eade17a6c97514a386b440 Mon Sep 17 00:00:00 2001 From: Anush008 <46051506+Anush008@users.noreply.github.com> Date: Sun, 19 Nov 2023 09:44:29 +0530 Subject: [PATCH 5/7] chore: remove client param --- components/index_qdrant/README.md | 1 - .../index_qdrant/fondant_component.yaml | 6 +--- components/index_qdrant/src/main.py | 35 ++++++++----------- components/index_qdrant/tests/qdrant_test.py | 10 +++++- 4 files changed, 24 insertions(+), 28 deletions(-) diff --git a/components/index_qdrant/README.md b/components/index_qdrant/README.md index 04172747f..f54f23cab 100644 --- a/components/index_qdrant/README.md +++ b/components/index_qdrant/README.md @@ -37,7 +37,6 @@ The component takes the following arguments to alter its behavior: | host | str | Host name of Qdrant service. If url and host are not set, defaults to 'localhost'. | None | | path | str | Persistence path for QdrantLocal. Eg. `local_data/qdrant` | None | | force_disable_check_same_thread | bool | Force disable check_same_thread for QdrantLocal sqlite connection. | False | -| client | object | Optional Qdrant client instance to use. | None | ### Usage diff --git a/components/index_qdrant/fondant_component.yaml b/components/index_qdrant/fondant_component.yaml index b3013b2f7..108c285c5 100644 --- a/components/index_qdrant/fondant_component.yaml +++ b/components/index_qdrant/fondant_component.yaml @@ -77,8 +77,4 @@ args: force_disable_check_same_thread: description: Force disable check_same_thread for QdrantLocal sqlite connection. type: bool - default: False - client: - description: Optional Qdrant client instance to use. - type: object - default: None \ No newline at end of file + default: False \ No newline at end of file diff --git a/components/index_qdrant/src/main.py b/components/index_qdrant/src/main.py index aca9a0d1c..ffe52d7de 100644 --- a/components/index_qdrant/src/main.py +++ b/components/index_qdrant/src/main.py @@ -26,29 +26,22 @@ def __init__( # noqa host: Optional[str] = None, path: Optional[str] = None, force_disable_check_same_thread: bool = False, - client: Optional[QdrantClient] = None, ): """Initialize the IndexQdrantComponent with the component parameters.""" - if client is None: - self.client = QdrantClient( - location=location, - url=url, - port=port, - grpc_port=grpc_port, - prefer_grpc=prefer_grpc, - https=https, - api_key=api_key, - prefix=prefix, - timeout=timeout, - host=host, - path=path, - force_disable_check_same_thread=force_disable_check_same_thread, - ) - else: - self.client = client - self.collection_name = collection_name - self.batch_size = batch_size - self.parallelism = parallelism + self.client = QdrantClient( + location=location, + url=url, + port=port, + grpc_port=grpc_port, + prefer_grpc=prefer_grpc, + https=https, + api_key=api_key, + prefix=prefix, + timeout=timeout, + host=host, + path=path, + force_disable_check_same_thread=force_disable_check_same_thread, + ) def write(self, dataframe: dd.DataFrame) -> None: """ diff --git a/components/index_qdrant/tests/qdrant_test.py b/components/index_qdrant/tests/qdrant_test.py index 542f285ff..61d4e2cbc 100644 --- a/components/index_qdrant/tests/qdrant_test.py +++ b/components/index_qdrant/tests/qdrant_test.py @@ -23,7 +23,13 @@ def test_qdrant_write(): collection_name=collection_name, vectors_config=models.VectorParams(distance=models.Distance.COSINE, size=5), ) - component = IndexQdrantComponent(collection_name=collection_name, client=client) + # There cannot be multiple clients accessing the same local persistent storage + # Qdrant server supports multiple concurrent access + del client + + component = IndexQdrantComponent( + collection_name=collection_name, path=str(tmpdir) + ) dask_dataframe = dd.DataFrame.from_dict( { @@ -37,5 +43,7 @@ def test_qdrant_write(): ) component.write(dask_dataframe) + del component + client = QdrantClient(path=str(tmpdir)) assert client.count(collection_name).count == entries From 4096c5c0cd0514d06f8921e28b40d22de4b1c927 Mon Sep 17 00:00:00 2001 From: Anush008 <46051506+Anush008@users.noreply.github.com> Date: Sun, 19 Nov 2023 11:57:16 +0530 Subject: [PATCH 6/7] fix: attribute assignment --- components/index_qdrant/src/main.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/components/index_qdrant/src/main.py b/components/index_qdrant/src/main.py index ffe52d7de..da83a7c52 100644 --- a/components/index_qdrant/src/main.py +++ b/components/index_qdrant/src/main.py @@ -42,6 +42,9 @@ def __init__( # noqa path=path, force_disable_check_same_thread=force_disable_check_same_thread, ) + self.collection_name = collection_name + self.batch_size = batch_size + self.parallelism = parallelism def write(self, dataframe: dd.DataFrame) -> None: """ From 38acad628e08bb4a777c342a9e31bc897bde8fc2 Mon Sep 17 00:00:00 2001 From: Anush008 <46051506+Anush008@users.noreply.github.com> Date: Tue, 21 Nov 2023 15:17:39 +0530 Subject: [PATCH 7/7] docs: README autogen --- components/index_qdrant/README.md | 66 ++++++++++++------- .../index_qdrant/fondant_component.yaml | 5 +- components/index_qdrant/src/main.py | 4 +- components/index_qdrant/tests/qdrant_test.py | 3 +- 4 files changed, 48 insertions(+), 30 deletions(-) diff --git a/components/index_qdrant/README.md b/components/index_qdrant/README.md index f54f23cab..0335f6835 100644 --- a/components/index_qdrant/README.md +++ b/components/index_qdrant/README.md @@ -1,7 +1,7 @@ # Index Qdrant ### Description -A Fondant component to load textual data and embeddings into a [Qdrant](https://qdrant.tech/) database. +A Fondant component to load textual data and embeddings into a Qdrant database. NOTE: A Qdrant collection has to be created in advance with the appropriate configurations. https://qdrant.tech/documentation/concepts/collections/ ### Inputs / outputs @@ -13,9 +13,6 @@ A Fondant component to load textual data and embeddings into a [Qdrant](https:// **This component produces no data.** -> [!IMPORTANT] -> A Qdrant collection has to created in advance with appropriate vector configurations. Find out how to [here](https://qdrant.tech/documentation/concepts/collections/). - ### Arguments The component takes the following arguments to alter its behavior: @@ -23,21 +20,20 @@ The component takes the following arguments to alter its behavior: | argument | type | description | default | | -------- | ---- | ----------- | ------- | | collection_name | str | The name of the Qdrant collection to upsert data into. | / | -| location | str | If `:memory:` - use in-memory Qdrant instance else use it as a url parameter. | None | -| batch_size | int | The batch size to use when uploading points to Qdrant. | 100 | -| parallelism | int | The number of parallel workers to use when uploading points to Qdrant. | None | -| url | str | Either host or str of 'Optional[scheme], host, Optional[port], Optional[prefix]'. Eg. `http://localhost:6333` | None | -| port | int | Port of the REST API interface.| 6333 | -| grpc_port | str | Port of the gRPC interface. | 6334 | -| prefer_grpc | bool | If `true` - use gRPC interface whenever possible in custom methods. | False | -| https | bool | If `true` - use HTTPS(SSL) protocol. | False | -| api_key | str | API key for authentication in Qdrant Cloud. | None | -| prefix | str | If set, add `prefix` to the REST URL path. Example: `service/v1` will result in `http://localhost:6333/service/v1/{qdrant-endpoint}` for REST API. | None | -| timeout | int | Timeout for REST and gRPC API requests. | 5 for REST, Unlimited for GRPC | -| host | str | Host name of Qdrant service. If url and host are not set, defaults to 'localhost'. | None | -| path | str | Persistence path for QdrantLocal. Eg. `local_data/qdrant` | None | -| force_disable_check_same_thread | bool | Force disable check_same_thread for QdrantLocal sqlite connection. | False | - +| location | str | The location of the Qdrant instance. | / | +| batch_size | int | The batch size to use when uploading points to Qdrant. | 64 | +| parallelism | int | The number of parallel workers to use when uploading points to Qdrant. | 1 | +| url | str | Either host or str of 'Optional[scheme], host, Optional[port], Optional[prefix]'. | / | +| port | int | Port of the REST API interface. | 6333 | +| grpc_port | int | Port of the gRPC interface. | 6334 | +| prefer_grpc | bool | If `true` - use gRPC interface whenever possible in custom methods. | / | +| https | bool | If `true` - use HTTPS(SSL) protocol. | / | +| api_key | str | API key for authentication in Qdrant Cloud. | / | +| prefix | str | If set, add `prefix` to the REST URL path. | / | +| timeout | int | Timeout for API requests. | / | +| host | str | Host name of Qdrant service. If url and host are not set, defaults to 'localhost'. | / | +| path | str | Persistence path for QdrantLocal. Eg. `local_data/qdrant` | / | +| force_disable_check_same_thread | bool | Force disable check_same_thread for QdrantLocal sqlite connection. | / | ### Usage @@ -46,14 +42,34 @@ You can add this component to your pipeline using the following code: ```python from fondant.pipeline import ComponentOp + index_qdrant_op = ComponentOp.from_registry( name="index_qdrant", - # Add arguments arguments={ - "collection_name": "fondant_loaded_data", - # "location": "http://localhost:6333", - # "batch_size": 100, + # Add arguments + # "collection_name": , + # "location": , + # "batch_size": 64, + # "parallelism": 1, + # "url": , + # "port": 6333, + # "grpc_port": 6334, + # "prefer_grpc": False, + # "https": False, + # "api_key": , + # "prefix": , + # "timeout": 0, + # "host": , + # "path": , + # "force_disable_check_same_thread": False, } ) -pipeline.add_op(index_qdrant_op, dependencies=[...]) -``` \ No newline at end of file +pipeline.add_op(index_qdrant_op, dependencies=[...]) #Add previous component as dependency +``` + +### Testing + +You can run the tests using docker with BuildKit. From this directory, run: +``` +docker build . --target test +``` diff --git a/components/index_qdrant/fondant_component.yaml b/components/index_qdrant/fondant_component.yaml index 108c285c5..6feb3b257 100644 --- a/components/index_qdrant/fondant_component.yaml +++ b/components/index_qdrant/fondant_component.yaml @@ -1,7 +1,8 @@ name: Index Qdrant description: >- - A Fondant component to load textual data and embeddings into a Qdrant - database. + A Fondant component to load textual data and embeddings into a Qdrant database. + NOTE: A Qdrant collection has to be created in advance with the appropriate configurations. https://qdrant.tech/documentation/concepts/collections/ + image: 'fndnt/index_qdrant:dev' tags: - Data writing diff --git a/components/index_qdrant/src/main.py b/components/index_qdrant/src/main.py index da83a7c52..06b68426d 100644 --- a/components/index_qdrant/src/main.py +++ b/components/index_qdrant/src/main.py @@ -8,7 +8,7 @@ class IndexQdrantComponent(DaskWriteComponent): - def __init__( # noqa + def __init__( self, *_, collection_name: str, @@ -63,7 +63,7 @@ def write(self, dataframe: dd.DataFrame) -> None: } id = str(uuid.uuid4()) # Check if 'text_embedding' attribute is a string. - # If it is, use ast.literal_eval to safely evaluate the string and convert it into a Python list of floats. + # If it is, safely evaluate and convert it into a list of floats. # else (i.e., it is already a list), it is directly assigned. embedding = ( ast.literal_eval(row.text_embedding) diff --git a/components/index_qdrant/tests/qdrant_test.py b/components/index_qdrant/tests/qdrant_test.py index 61d4e2cbc..835c7de01 100644 --- a/components/index_qdrant/tests/qdrant_test.py +++ b/components/index_qdrant/tests/qdrant_test.py @@ -28,7 +28,8 @@ def test_qdrant_write(): del client component = IndexQdrantComponent( - collection_name=collection_name, path=str(tmpdir) + collection_name=collection_name, + path=str(tmpdir), ) dask_dataframe = dd.DataFrame.from_dict(