Skip to content

Commit

Permalink
bulkwriter sample read csv (#1686)
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo authored Sep 12, 2023
1 parent b165651 commit 9dd5903
Show file tree
Hide file tree
Showing 8 changed files with 474 additions and 103 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ htmlcov/
debug/
.codecov.yml
coverage.xml

# Example data
/examples/bulk_writer
101 changes: 101 additions & 0 deletions examples/data/train_embeddings.csv

Large diffs are not rendered by default.

294 changes: 226 additions & 68 deletions examples/example_bulkwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import json
import random
import threading
import time
import pandas as pd
import numpy as np

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("example_bulkwriter")

from pymilvus import (
connections,
Expand All @@ -30,6 +32,7 @@
bulk_import,
get_import_progress,
list_import_jobs,
BulkInsertState,
)

# minio
Expand All @@ -41,65 +44,119 @@
HOST = '127.0.0.1'
PORT = '19530'

COLLECTION_NAME = "test_abc"
DIM = 256
CSV_COLLECTION_NAME = "test_csv"
ALL_TYPES_COLLECTION_NAME = "test_all_types"
DIM = 512

def gen_binary_vector():
raw_vector = [random.randint(0, 1) for i in range(DIM)]
binary_vectors = np.packbits(raw_vector, axis=-1).tolist()
return binary_vectors

def gen_float_vector():
return [random.random() for _ in range(DIM)]

def create_connection():
print(f"\nCreate connection...")
connections.connect(host=HOST, port=PORT)
print(f"\nConnected")


def build_collection():
if utility.has_collection(COLLECTION_NAME):
utility.drop_collection(COLLECTION_NAME)

field1 = FieldSchema(name="id", dtype=DataType.INT64, auto_id=True, is_primary=True)
field2 = FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=DIM)
field3 = FieldSchema(name="desc", dtype=DataType.VARCHAR, max_length=100)
schema = CollectionSchema(fields=[field1, field2, field3])
collection = Collection(name=COLLECTION_NAME, schema=schema)
print("Collection created")
def build_csv_collection():
print(f"\n===================== create collection ====================")
if utility.has_collection(CSV_COLLECTION_NAME):
utility.drop_collection(CSV_COLLECTION_NAME)

fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=DIM),
FieldSchema(name="label", dtype=DataType.VARCHAR, max_length=512),
]
schema = CollectionSchema(fields=fields)
collection = Collection(name=CSV_COLLECTION_NAME, schema=schema)
print(f"Collection '{collection.name}' created")
return collection.schema

def build_all_type_schema(bin_vec: bool):
print(f"\n===================== build all types schema ====================")
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
FieldSchema(name="bool", dtype=DataType.BOOL),
FieldSchema(name="int8", dtype=DataType.INT8),
FieldSchema(name="int16", dtype=DataType.INT16),
FieldSchema(name="int32", dtype=DataType.INT32),
FieldSchema(name="int64", dtype=DataType.INT64),
FieldSchema(name="float", dtype=DataType.FLOAT),
FieldSchema(name="double", dtype=DataType.DOUBLE),
FieldSchema(name="varchar", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="json", dtype=DataType.JSON),
FieldSchema(name="vector", dtype=DataType.BINARY_VECTOR, dim=DIM) if bin_vec else FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=DIM),
]
schema = CollectionSchema(fields=fields, enable_dynamic_field=True)
return schema

def read_sample_data(file_path: str, writer: [LocalBulkWriter, RemoteBulkWriter]):
csv_data = pd.read_csv(file_path)
print(f"The csv file has {csv_data.shape[0]} rows")
for i in range(csv_data.shape[0]):
row = {}
for col in csv_data.columns.values:
if col == "vector":
vec = json.loads(csv_data[col][i])
row[col] = vec
else:
row[col] = csv_data[col][i]

writer.append_row(row)

def test_local_writer_json(schema: CollectionSchema):
local_writer = LocalBulkWriter(schema=schema,
local_path="/tmp/bulk_data",
segment_size=4*1024*1024,
file_type=BulkFileType.JSON_RB,
)
for i in range(10):
local_writer.append_row({"id": i, "vector": [random.random() for _ in range(DIM)], "desc": f"description_{i}"})
print(f"\n===================== test local JSON writer ====================")
with LocalBulkWriter(
schema=schema,
local_path="/tmp/bulk_writer",
segment_size=4*1024*1024,
file_type=BulkFileType.JSON_RB,
) as local_writer:
read_sample_data("./data/train_embeddings.csv", local_writer)
local_writer.commit()
batch_files = local_writer.batch_files

local_writer.commit()
print("test local writer done!")
print(local_writer.data_path)
return local_writer.data_path
print(f"Test local writer done! output local files: {batch_files}")

def test_local_writer_npy(schema: CollectionSchema):
local_writer = LocalBulkWriter(schema=schema, local_path="/tmp/bulk_data", segment_size=4*1024*1024)
for i in range(10000):
local_writer.append_row({"id": i, "vector": [random.random() for _ in range(DIM)], "desc": f"description_{i}"})

local_writer.commit()
print("test local writer done!")
print(local_writer.data_path)
return local_writer.data_path
def test_local_writer_npy(schema: CollectionSchema):
print(f"\n===================== test local npy writer ====================")
with LocalBulkWriter(
schema=schema,
local_path="/tmp/bulk_writer",
segment_size=4*1024*1024,
) as local_writer:
read_sample_data("./data/train_embeddings.csv", local_writer)
local_writer.commit()
batch_files = local_writer.batch_files

print(f"Test local writer done! output local files: {batch_files}")

def _append_row(writer: LocalBulkWriter, begin: int, end: int):
for i in range(begin, end):
writer.append_row({"id": i, "vector": [random.random() for _ in range(DIM)], "desc": f"description_{i}"})

def test_parallel_append(schema: CollectionSchema):
local_writer = LocalBulkWriter(schema=schema,
local_path="/tmp/bulk_data",
segment_size=1000 * 1024 * 1024,
file_type=BulkFileType.JSON_RB,
)
print(f"\n===================== test parallel append ====================")
def _append_row(writer: LocalBulkWriter, begin: int, end: int):
try:
for i in range(begin, end):
writer.append_row({"path": f"path_{i}", "vector": gen_float_vector(), "label": f"label_{i}"})
except Exception as e:
print("failed to append row!")

local_writer = LocalBulkWriter(
schema=schema,
local_path="/tmp/bulk_writer",
segment_size=1000 * 1024 * 1024,
file_type=BulkFileType.JSON_RB,
)
threads = []
thread_count = 100
rows_per_thread = 1000
rows_per_thread = 100
for k in range(thread_count):
x = threading.Thread(target=_append_row, args=(local_writer, k*rows_per_thread, (k+1)*rows_per_thread,))
threads.append(x)
Expand All @@ -108,6 +165,7 @@ def test_parallel_append(schema: CollectionSchema):

for th in threads:
th.join()
print(f"Thread '{th.name}' finished")

local_writer.commit()
print(f"Append finished, {thread_count*rows_per_thread} rows")
Expand All @@ -118,40 +176,128 @@ def test_parallel_append(schema: CollectionSchema):
print("Verify the output content...")
rows = data['rows']
assert len(rows) == thread_count*rows_per_thread
for i in range(len(rows)):
row = rows[i]
assert row['desc'] == f"description_{row['id']}"
for row in rows:
pa = row['path']
label = row['label']
assert pa.replace("path_", "") == label.replace("label_", "")
print("Data is correct")


def test_remote_writer(schema: CollectionSchema):
remote_writer = RemoteBulkWriter(schema=schema,
remote_path="bulk_data",
local_path="/tmp/bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
endpoint=MINIO_ADDRESS,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
bucket_name="a-bucket",
),
segment_size=50 * 1024 * 1024,
)
print(f"\n===================== test remote writer ====================")
with RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
endpoint=MINIO_ADDRESS,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
bucket_name="a-bucket",
),
segment_size=50 * 1024 * 1024,
) as remote_writer:
read_sample_data("./data/train_embeddings.csv", remote_writer)
remote_writer.commit()
batch_files = remote_writer.batch_files

print(f"Test remote writer done! output remote files: {batch_files}")


def test_all_types_writer(bin_vec: bool, schema: CollectionSchema)->list:
print(f"\n===================== all types test ====================")
remote_writer = RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
endpoint=MINIO_ADDRESS,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
bucket_name="a-bucket",
),
)

print("Append rows")
for i in range(10000):
if i % 1000 == 0:
logger.info(f"{i} rows has been append to remote writer")
remote_writer.append_row({"id": i, "vector": [random.random() for _ in range(DIM)], "desc": f"description_{i}"})

row = {
"id": i,
"bool": True if i%5 == 0 else False,
"int8": i%128,
"int16": i%1000,
"int32": i%100000,
"int64": i,
"float": i/3,
"double": i/7,
"varchar": f"varchar_{i}",
"json": {"dummy": i, "ok": f"name_{i}"},
"vector": gen_binary_vector() if bin_vec else gen_float_vector(),
f"dynamic_{i}": i,
}
remote_writer.append_row(row)

print("Generate data files...")
remote_writer.commit()
print("test remote writer done!")
print(remote_writer.data_path)
return remote_writer.data_path

print(f"Data files have been uploaded: {remote_writer.batch_files}")
return remote_writer.batch_files


def test_call_bulkinsert(schema: CollectionSchema, batch_files: list):
print(f"\n===================== test call bulkinsert ====================")
if utility.has_collection(ALL_TYPES_COLLECTION_NAME):
utility.drop_collection(ALL_TYPES_COLLECTION_NAME)

collection = Collection(name=ALL_TYPES_COLLECTION_NAME, schema=schema)
print(f"Collection '{collection.name}' created")

task_ids = []
for files in batch_files:
task_id = utility.do_bulk_insert(collection_name=ALL_TYPES_COLLECTION_NAME, files=files)
task_ids.append(task_id)
print(f"Create a bulkinert task, task id: {task_id}")

while len(task_ids) > 0:
print("Wait 1 second to check bulkinsert tasks state...")
time.sleep(1)
for id in task_ids:
state = utility.get_bulk_insert_state(task_id=id)
if state.state == BulkInsertState.ImportFailed or state.state == BulkInsertState.ImportFailedAndCleaned:
print(f"The task {state.task_id} failed, reason: {state.failed_reason}")
task_ids.remove(id)
elif state.state == BulkInsertState.ImportCompleted:
print(f"The task {state.task_id} completed")
task_ids.remove(id)

print(f"Collection row number: {collection.num_entities}")


def test_retrieve_imported_data(bin_vec: bool):
collection = Collection(name=ALL_TYPES_COLLECTION_NAME)
print("Create index...")
index_param = {
"index_type": "BIN_FLAT",
"params": {},
"metric_type": "HAMMING"
} if bin_vec else {
"index_type": "FLAT",
"params": {},
"metric_type": "L2"
}
collection.create_index(field_name="vector", index_params=index_param)

ids = [100, 5000]
print(f"Load collection and query items {ids}")
collection.load()
expr = f"id in {ids}"
print(expr)
results = collection.query(expr=expr, output_fields=["*", "vector"])
print("Query results:")
for item in results:
print(item)

def test_cloud_bulkinsert():
url = "https://_your_cloud_server_url_"
cluster_id = "_your_cloud_instance_id_"

print(f"===================== import files to cloud vectordb ====================")
print(f"\n===================== import files to cloud vectordb ====================")
object_url = "_your_object_storage_service_url_"
object_url_access_key = "_your_object_storage_service_access_key_"
object_url_secret_key = "_your_object_storage_service_secret_key_"
Expand All @@ -161,11 +307,11 @@ def test_cloud_bulkinsert():
access_key=object_url_access_key,
secret_key=object_url_secret_key,
cluster_id=cluster_id,
collection_name=COLLECTION_NAME,
collection_name=CSV_COLLECTION_NAME,
)
print(resp)

print(f"===================== get import job progress ====================")
print(f"\n===================== get import job progress ====================")
job_id = resp['data']['jobId']
resp = get_import_progress(
url=url,
Expand All @@ -174,7 +320,7 @@ def test_cloud_bulkinsert():
)
print(resp)

print(f"===================== list import jobs ====================")
print(f"\n===================== list import jobs ====================")
resp = list_import_jobs(
url=url,
cluster_id=cluster_id,
Expand All @@ -186,12 +332,24 @@ def test_cloud_bulkinsert():

if __name__ == '__main__':
create_connection()
schema = build_collection()

schema = build_csv_collection()
test_local_writer_json(schema)
test_local_writer_npy(schema)
test_remote_writer(schema)
test_parallel_append(schema)

# float vectors + all scalar types
schema = build_all_type_schema(bin_vec=False)
batch_files = test_all_types_writer(bin_vec=False, schema=schema)
test_call_bulkinsert(schema, batch_files)
test_retrieve_imported_data(bin_vec=False)

# binary vectors + all scalar types
schema = build_all_type_schema(bin_vec=True)
batch_files = test_all_types_writer(bin_vec=True, schema=schema)
test_call_bulkinsert(schema, batch_files)
test_retrieve_imported_data(bin_vec=True)

# test_cloud_bulkinsert()

Loading

0 comments on commit 9dd5903

Please sign in to comment.