Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify bulkinsert example #1228

Merged
merged 1 commit into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions examples/example_bulkload.py → examples/example_bulkinsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
# rocksmq:
# path: /tmp/milvus/rdb_data
# storageType: local
MILVUS_DATA_PATH = "/tmp/milvus/data/"

FILES_PATH = "/tmp/milvus_bulkinsert/"

# Milvus service address
_HOST = '127.0.0.1'
Expand Down Expand Up @@ -126,7 +127,7 @@ def gen_json_rowbased(num, path, tag):
json.dump(data, json_file)


# Bulkload for row-based files, each file is converted to a task.
# For row-based files, each file is converted to a task. Each time you can call do_bulk_insert() to insert one file.
# The rootcoord maintains a task list, each idle datanode will receive a task. If no datanode available, the task will
# be put into pending list to wait, the max size of pending list is 32. If new tasks count exceed spare quantity of
# pending list, the do_bulk_insert() method will return error.
Expand All @@ -140,27 +141,22 @@ def gen_json_rowbased(num, path, tag):
# But if the segment.maxSize of milvus.yml is set to a small value, there could be shardNum*2, shardNum*3 segments
# generated, or even more.
def bulk_insert_rowbased(row_count_each_file, file_count, tag, partition_name = None):
# make sure the data path is exist
exist = os.path.exists(MILVUS_DATA_PATH)
if not exist:
os.mkdir(MILVUS_DATA_PATH)

file_names = []
for i in range(file_count):
file_names.append("rows_" + str(i) + ".json")
# make sure the files folder is created
os.makedirs(name=FILES_PATH, exist_ok=True)

task_ids = []
for filename in file_names:
print("Generate row-based file:", MILVUS_DATA_PATH + filename)
gen_json_rowbased(row_count_each_file, MILVUS_DATA_PATH + filename, tag)
print("Import row-based file:", filename)
for i in range(file_count):
file_path = FILES_PATH + "rows_" + str(i) + ".json"
print("Generate row-based file:", file_path)
gen_json_rowbased(row_count_each_file, file_path, tag)
print("Import row-based file:", file_path)
task_id = utility.do_bulk_insert(collection_name=_COLLECTION_NAME,
partition_name=partition_name,
files=[filename])
files=[file_path])
task_ids.append(task_id)
return wait_tasks_persisted(task_ids)

# wait all bulk insert tasks to be a certain state
# Wait all bulk insert tasks to be a certain state
# return the states of all the tasks, including failed task
def wait_tasks_to_state(task_ids, state_code):
wait_ids = task_ids
Expand Down Expand Up @@ -191,7 +187,7 @@ def wait_tasks_to_state(task_ids, state_code):

# Get bulk insert task state to check whether the data file has been parsed and persisted successfully.
# Persisted state doesn't mean the data is queryable, to query the data, you need to wait until the segment is
# loaded into memory.
# indexed successfully and loaded into memory.
def wait_tasks_persisted(task_ids):
print("=========================================================================================================")
states = wait_tasks_to_state(task_ids, BulkInsertState.ImportPersisted)
Expand Down
23 changes: 23 additions & 0 deletions pymilvus/orm/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,17 @@ def do_bulk_insert(collection_name: str, files: list, partition_name=None, timeo

:raises BaseException: If collection_name doesn't exist.
:raises BaseException: If the files input is illegal.

:example:
>>> from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
>>> connections.connect()
>>> schema = CollectionSchema([
... FieldSchema("film_id", DataType.INT64, is_primary=True),
... FieldSchema("films", dtype=DataType.FLOAT_VECTOR, dim=2)
... ])
>>> collection = Collection("test_collection_bulk_insert", schema)
>>> task_id = utility.do_bulk_insert(collection_name=collection.name, files=['data.json'])
>>> print(task_id)
"""
return _get_connection(using).do_bulk_insert(collection_name, partition_name, files, timeout=timeout, **kwargs)

Expand All @@ -682,6 +693,13 @@ def get_bulk_insert_state(task_id, timeout=None, using="default", **kwargs) -> B

:return: BulkInsertState
:rtype: BulkInsertState

:example:
>>> from pymilvus import connections, utility, BulkInsertState
>>> connections.connect()
>>> state = utility.get_bulk_insert_state(task_id=id) # the id is returned by do_bulk_insert()
>>> if state.state == BulkInsertState.ImportFailed or state.state == BulkInsertState.ImportFailedAndCleaned:
>>> print("task id:", state.task_id, "failed, reason:", state.failed_reason)
"""
return _get_connection(using).get_bulk_insert_state(task_id, timeout=timeout, **kwargs)

Expand All @@ -698,6 +716,11 @@ def list_bulk_insert_tasks(limit=0, collection_name=None, timeout=None, using="d
:return: list[BulkInsertState]
:rtype: list[BulkInsertState]

:example:
>>> from pymilvus import connections, utility, BulkInsertState
>>> connections.connect()
>>> tasks = utility.list_bulk_insert_tasks(collection_name=collection_name)
>>> print(tasks)
"""
return _get_connection(using).list_bulk_insert_tasks(limit, collection_name, timeout=timeout, **kwargs)

Expand Down