Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] bulkwriter sample read csv #1686

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ htmlcov/
debug/
.codecov.yml
coverage.xml

# Example data
/examples/bulk_writer
101 changes: 101 additions & 0 deletions examples/data/train_embeddings.csv

Large diffs are not rendered by default.

294 changes: 226 additions & 68 deletions examples/example_bulkwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import json
import random
import threading
import time
import pandas as pd
import numpy as np

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("example_bulkwriter")

from pymilvus import (
connections,
Expand All @@ -30,6 +32,7 @@
bulk_import,
get_import_progress,
list_import_jobs,
BulkInsertState,
)

# minio
Expand All @@ -41,65 +44,119 @@
HOST = '127.0.0.1'
PORT = '19530'

COLLECTION_NAME = "test_abc"
DIM = 256
CSV_COLLECTION_NAME = "test_csv"
ALL_TYPES_COLLECTION_NAME = "test_all_types"
DIM = 512

def gen_binary_vector():
raw_vector = [random.randint(0, 1) for i in range(DIM)]
binary_vectors = np.packbits(raw_vector, axis=-1).tolist()
return binary_vectors

def gen_float_vector():
return [random.random() for _ in range(DIM)]

def create_connection():
print(f"\nCreate connection...")
connections.connect(host=HOST, port=PORT)
print(f"\nConnected")


def build_collection():
if utility.has_collection(COLLECTION_NAME):
utility.drop_collection(COLLECTION_NAME)

field1 = FieldSchema(name="id", dtype=DataType.INT64, auto_id=True, is_primary=True)
field2 = FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=DIM)
field3 = FieldSchema(name="desc", dtype=DataType.VARCHAR, max_length=100)
schema = CollectionSchema(fields=[field1, field2, field3])
collection = Collection(name=COLLECTION_NAME, schema=schema)
print("Collection created")
def build_csv_collection():
print(f"\n===================== create collection ====================")
if utility.has_collection(CSV_COLLECTION_NAME):
utility.drop_collection(CSV_COLLECTION_NAME)

fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=DIM),
FieldSchema(name="label", dtype=DataType.VARCHAR, max_length=512),
]
schema = CollectionSchema(fields=fields)
collection = Collection(name=CSV_COLLECTION_NAME, schema=schema)
print(f"Collection '{collection.name}' created")
return collection.schema

def build_all_type_schema(bin_vec: bool):
print(f"\n===================== build all types schema ====================")
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
FieldSchema(name="bool", dtype=DataType.BOOL),
FieldSchema(name="int8", dtype=DataType.INT8),
FieldSchema(name="int16", dtype=DataType.INT16),
FieldSchema(name="int32", dtype=DataType.INT32),
FieldSchema(name="int64", dtype=DataType.INT64),
FieldSchema(name="float", dtype=DataType.FLOAT),
FieldSchema(name="double", dtype=DataType.DOUBLE),
FieldSchema(name="varchar", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="json", dtype=DataType.JSON),
FieldSchema(name="vector", dtype=DataType.BINARY_VECTOR, dim=DIM) if bin_vec else FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=DIM),
]
schema = CollectionSchema(fields=fields, enable_dynamic_field=True)
return schema

def read_sample_data(file_path: str, writer: [LocalBulkWriter, RemoteBulkWriter]):
csv_data = pd.read_csv(file_path)
print(f"The csv file has {csv_data.shape[0]} rows")
for i in range(csv_data.shape[0]):
row = {}
for col in csv_data.columns.values:
if col == "vector":
vec = json.loads(csv_data[col][i])
row[col] = vec
else:
row[col] = csv_data[col][i]

writer.append_row(row)

def test_local_writer_json(schema: CollectionSchema):
local_writer = LocalBulkWriter(schema=schema,
local_path="/tmp/bulk_data",
segment_size=4*1024*1024,
file_type=BulkFileType.JSON_RB,
)
for i in range(10):
local_writer.append_row({"id": i, "vector": [random.random() for _ in range(DIM)], "desc": f"description_{i}"})
print(f"\n===================== test local JSON writer ====================")
with LocalBulkWriter(
schema=schema,
local_path="/tmp/bulk_writer",
segment_size=4*1024*1024,
file_type=BulkFileType.JSON_RB,
) as local_writer:
read_sample_data("./data/train_embeddings.csv", local_writer)
local_writer.commit()
batch_files = local_writer.batch_files

local_writer.commit()
print("test local writer done!")
print(local_writer.data_path)
return local_writer.data_path
print(f"Test local writer done! output local files: {batch_files}")

def test_local_writer_npy(schema: CollectionSchema):
local_writer = LocalBulkWriter(schema=schema, local_path="/tmp/bulk_data", segment_size=4*1024*1024)
for i in range(10000):
local_writer.append_row({"id": i, "vector": [random.random() for _ in range(DIM)], "desc": f"description_{i}"})

local_writer.commit()
print("test local writer done!")
print(local_writer.data_path)
return local_writer.data_path
def test_local_writer_npy(schema: CollectionSchema):
print(f"\n===================== test local npy writer ====================")
with LocalBulkWriter(
schema=schema,
local_path="/tmp/bulk_writer",
segment_size=4*1024*1024,
) as local_writer:
read_sample_data("./data/train_embeddings.csv", local_writer)
local_writer.commit()
batch_files = local_writer.batch_files

print(f"Test local writer done! output local files: {batch_files}")

def _append_row(writer: LocalBulkWriter, begin: int, end: int):
for i in range(begin, end):
writer.append_row({"id": i, "vector": [random.random() for _ in range(DIM)], "desc": f"description_{i}"})

def test_parallel_append(schema: CollectionSchema):
local_writer = LocalBulkWriter(schema=schema,
local_path="/tmp/bulk_data",
segment_size=1000 * 1024 * 1024,
file_type=BulkFileType.JSON_RB,
)
print(f"\n===================== test parallel append ====================")
def _append_row(writer: LocalBulkWriter, begin: int, end: int):
try:
for i in range(begin, end):
writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(), "label": f"label_{i}"})
except Exception as e:
print("failed to append row!")

local_writer = LocalBulkWriter(
schema=schema,
local_path="/tmp/bulk_writer",
segment_size=1000 * 1024 * 1024,
file_type=BulkFileType.JSON_RB,
)
threads = []
thread_count = 100
rows_per_thread = 1000
rows_per_thread = 100
for k in range(thread_count):
x = threading.Thread(target=_append_row, args=(local_writer, k*rows_per_thread, (k+1)*rows_per_thread,))
threads.append(x)
Expand All @@ -108,6 +165,7 @@ def test_parallel_append(schema: CollectionSchema):

for th in threads:
th.join()
print(f"Thread '{th.name}' finished")

local_writer.commit()
print(f"Append finished, {thread_count*rows_per_thread} rows")
Expand All @@ -118,40 +176,128 @@ def test_parallel_append(schema: CollectionSchema):
print("Verify the output content...")
rows = data['rows']
assert len(rows) == thread_count*rows_per_thread
for i in range(len(rows)):
row = rows[i]
assert row['desc'] == f"description_{row['id']}"
for row in rows:
pa = row['path']
label = row['label']
assert pa.replace("path_", "") == label.replace("label_", "")
print("Data is correct")


def test_remote_writer(schema: CollectionSchema):
remote_writer = RemoteBulkWriter(schema=schema,
remote_path="bulk_data",
local_path="/tmp/bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
endpoint=MINIO_ADDRESS,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
bucket_name="a-bucket",
),
segment_size=50 * 1024 * 1024,
)
print(f"\n===================== test remote writer ====================")
with RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
endpoint=MINIO_ADDRESS,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
bucket_name="a-bucket",
),
segment_size=50 * 1024 * 1024,
) as remote_writer:
read_sample_data("./data/train_embeddings.csv", remote_writer)
remote_writer.commit()
batch_files = remote_writer.batch_files

print(f"Test remote writer done! output remote files: {batch_files}")


def test_all_types_writer(bin_vec: bool, schema: CollectionSchema)->list:
print(f"\n===================== all types test ====================")
remote_writer = RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
endpoint=MINIO_ADDRESS,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
bucket_name="a-bucket",
),
)

print("Append rows")
for i in range(10000):
if i % 1000 == 0:
logger.info(f"{i} rows has been append to remote writer")
remote_writer.append_row({"id": i, "vector": [random.random() for _ in range(DIM)], "desc": f"description_{i}"})

row = {
"id": i,
"bool": True if i%5 == 0 else False,
"int8": i%128,
"int16": i%1000,
"int32": i%100000,
"int64": i,
"float": i/3,
"double": i/7,
"varchar": f"varchar_{i}",
"json": {"dummy": i, "ok": f"name_{i}"},
"vector": gen_binary_vector() if bin_vec else gen_float_vector(),
f"dynamic_{i}": i,
}
remote_writer.append_row(row)

print("Generate data files...")
remote_writer.commit()
print("test remote writer done!")
print(remote_writer.data_path)
return remote_writer.data_path

print(f"Data files have been uploaded: {remote_writer.batch_files}")
return remote_writer.batch_files


def test_call_bulkinsert(schema: CollectionSchema, batch_files: list):
print(f"\n===================== test call bulkinsert ====================")
if utility.has_collection(ALL_TYPES_COLLECTION_NAME):
utility.drop_collection(ALL_TYPES_COLLECTION_NAME)

collection = Collection(name=ALL_TYPES_COLLECTION_NAME, schema=schema)
print(f"Collection '{collection.name}' created")

task_ids = []
for files in batch_files:
task_id = utility.do_bulk_insert(collection_name=ALL_TYPES_COLLECTION_NAME, files=files)
task_ids.append(task_id)
print(f"Create a bulkinert task, task id: {task_id}")

while len(task_ids) > 0:
print("Wait 1 second to check bulkinsert tasks state...")
time.sleep(1)
for id in task_ids:
state = utility.get_bulk_insert_state(task_id=id)
if state.state == BulkInsertState.ImportFailed or state.state == BulkInsertState.ImportFailedAndCleaned:
print(f"The task {state.task_id} failed, reason: {state.failed_reason}")
task_ids.remove(id)
elif state.state == BulkInsertState.ImportCompleted:
print(f"The task {state.task_id} completed")
task_ids.remove(id)

print(f"Collection row number: {collection.num_entities}")


def test_retrieve_imported_data(bin_vec: bool):
collection = Collection(name=ALL_TYPES_COLLECTION_NAME)
print("Create index...")
index_param = {
"index_type": "BIN_FLAT",
"params": {},
"metric_type": "HAMMING"
} if bin_vec else {
"index_type": "FLAT",
"params": {},
"metric_type": "L2"
}
collection.create_index(field_name="vector", index_params=index_param)

ids = [100, 5000]
print(f"Load collection and query items {ids}")
collection.load()
expr = f"id in {ids}"
print(expr)
results = collection.query(expr=expr, output_fields=["*", "vector"])
print("Query results:")
for item in results:
print(item)

def test_cloud_bulkinsert():
url = "https://_your_cloud_server_url_"
cluster_id = "_your_cloud_instance_id_"

print(f"===================== import files to cloud vectordb ====================")
print(f"\n===================== import files to cloud vectordb ====================")
object_url = "_your_object_storage_service_url_"
object_url_access_key = "_your_object_storage_service_access_key_"
object_url_secret_key = "_your_object_storage_service_secret_key_"
Expand All @@ -161,11 +307,11 @@ def test_cloud_bulkinsert():
access_key=object_url_access_key,
secret_key=object_url_secret_key,
cluster_id=cluster_id,
collection_name=COLLECTION_NAME,
collection_name=CSV_COLLECTION_NAME,
)
print(resp)

print(f"===================== get import job progress ====================")
print(f"\n===================== get import job progress ====================")
job_id = resp['data']['jobId']
resp = get_import_progress(
url=url,
Expand All @@ -174,7 +320,7 @@ def test_cloud_bulkinsert():
)
print(resp)

print(f"===================== list import jobs ====================")
print(f"\n===================== list import jobs ====================")
resp = list_import_jobs(
url=url,
cluster_id=cluster_id,
Expand All @@ -186,12 +332,24 @@ def test_cloud_bulkinsert():

if __name__ == '__main__':
create_connection()
schema = build_collection()

schema = build_csv_collection()
test_local_writer_json(schema)
test_local_writer_npy(schema)
test_remote_writer(schema)
test_parallel_append(schema)

# float vectors + all scalar types
schema = build_all_type_schema(bin_vec=False)
batch_files = test_all_types_writer(bin_vec=False, schema=schema)
test_call_bulkinsert(schema, batch_files)
test_retrieve_imported_data(bin_vec=False)

# binary vectors + all scalar types
schema = build_all_type_schema(bin_vec=True)
batch_files = test_all_types_writer(bin_vec=True, schema=schema)
test_call_bulkinsert(schema, batch_files)
test_retrieve_imported_data(bin_vec=True)

# test_cloud_bulkinsert()

Loading
Loading