Skip to content

Commit

Permalink
Only allow one file for row-based import (#1215)
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>

Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo authored Nov 1, 2022
1 parent d4ed64f commit 7be2a67
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 36 deletions.
29 changes: 17 additions & 12 deletions examples/example_bulkload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# This example shows how to:
# 1. connect to Milvus server
# 2. create a collection
# 3. create some json files for bulk_insert operation
# 4. do bulk_insert
# 3. create some json files for do_bulk_insert operation
# 4. call do_bulk_insert
# 5. search

# To run this example, start a standalone(local storage) milvus with the following configurations, in the milvus.yml:
Expand Down Expand Up @@ -99,7 +99,7 @@ def create_partition(collection, partition_name):
# Generate a json file with row-based data.
# The json file must contain a root key "rows", its value is a list, each row must contain a value of each field.
# No need to provide the auto-id field "id_field" since milvus will generate it.
# The row-based json file looks like this:
# The row-based json file looks like:
# {"rows": [
# {"str_field": "row-based_0", "float_vector_field": [0.190, 0.046, 0.143, 0.972, 0.592, 0.238, 0.266, 0.995]},
# {"str_field": "row-based_1", "float_vector_field": [0.149, 0.586, 0.012, 0.673, 0.588, 0.917, 0.949, 0.944]},
Expand All @@ -124,7 +124,7 @@ def gen_json_rowbased(num, path, tag):
# Bulkload for row-based files, each file is converted to a task.
# The rootcoord maintains a task list, each idle datanode will receive a task. If no datanode available, the task will
# be put into pending list to wait, the max size of pending list is 32. If new tasks count exceed spare quantity of
# pending list, the bulk_insert() method will return error.
# pending list, the do_bulk_insert() method will return error.
# Once a task is finished, the datanode become idle and will receive another task.
#
# The max size of each file is 1GB, if a file size is larger than 1GB, the task will failed and you will get error
Expand All @@ -143,13 +143,16 @@ def bulk_insert_rowbased(row_count_each_file, file_count, tag, partition_name =
file_names = []
for i in range(file_count):
file_names.append("rows_" + str(i) + ".json")

task_ids = []
for filename in file_names:
print("Generate row-based file:", MILVUS_DATA_PATH + filename)
gen_json_rowbased(row_count_each_file, MILVUS_DATA_PATH + filename, tag)

print("Bulkload row-based files:", file_names)
task_ids = utility.bulk_insert(collection_name=_COLLECTION_NAME,
partition_name=partition_name,
files=file_names)
print("Import row-based file:", filename)
task_id = utility.do_bulk_insert(collection_name=_COLLECTION_NAME,
partition_name=partition_name,
files=[filename])
task_ids.append(task_id)
return wait_tasks_persisted(task_ids)

# wait all bulk insert tasks to be a certain state
Expand Down Expand Up @@ -217,10 +220,11 @@ def wait_tasks_competed(task_ids):
return states

# List all bulk insert tasks, including pending tasks, working tasks and finished tasks.
# the parameter 'limit' is: how many latest tasks should be returned
# the parameter 'limit' is: how many latest tasks should be returned, if the limit<=0, all the tasks will be returned
def list_all_bulk_insert_tasks(limit):
tasks = utility.list_bulk_insert_tasks(limit)
print("=========================================================================================================")
print("list bulk insert tasks with limit", limit)
pending = 0
started = 0
persisted = 0
Expand Down Expand Up @@ -359,24 +363,25 @@ def main():
# search in entire collection
vector = [round(random.random(), 6) for _ in range(_DIM)]
vectors = [vector]
print("Use a random vector to search in entire collection")
search(collection, _VECTOR_FIELD_NAME, vectors)

# search in a partition
print("Use a random vector to search in partition:", a_partition)
search(collection, _VECTOR_FIELD_NAME, vectors, partition_name=a_partition)

# pick some entities to delete
delete_ids = []
for task in tasks:
delete_ids.append(task.ids[5])
id_vectors = retrieve(collection, delete_ids)
print("Delete theses entities:", id_vectors)
delete(collection, delete_ids)

# search the delete entities to check existence, check the top0 of the search result
for id_vector in id_vectors:
id = id_vector[_ID_FIELD_NAME]
vector = id_vector[_VECTOR_FIELD_NAME]
print("Search id:", id, ", compare this id to the top0 of search result")
print("Search id:", id, ", compare this id to the top0 of search result, the entity with the id has been deleted")
# here we use Stong consistency level to do search, because we need to make sure the delete operation is applied
search(collection, _VECTOR_FIELD_NAME, [vector], partition_name=None, consistency_level="Strong")

Expand Down
2 changes: 1 addition & 1 deletion pymilvus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
load_balance,
mkts_from_hybridts, mkts_from_unixtime, mkts_from_datetime,
hybridts_to_unixtime, hybridts_to_datetime,
bulk_insert, get_bulk_insert_state, list_bulk_insert_tasks,
do_bulk_insert, get_bulk_insert_state, list_bulk_insert_tasks,
reset_password, create_user, update_password, delete_user, list_usernames,
)

Expand Down
8 changes: 5 additions & 3 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,13 +1027,15 @@ def get_replicas(self, collection_name, timeout=None, **kwargs) -> Replica:
return Replica(groups)

@retry_on_rpc_failure()
def bulk_insert(self, collection_name, partition_name, files: list, timeout=None, **kwargs) -> list:
req = Prepare.bulk_insert(collection_name, partition_name, files, **kwargs)
def do_bulk_insert(self, collection_name, partition_name, files: list, timeout=None, **kwargs) -> int:
req = Prepare.do_bulk_insert(collection_name, partition_name, files, **kwargs)
future = self._stub.Import.future(req, timeout=timeout)
response = future.result()
if response.status.error_code != 0:
raise MilvusException(response.status.error_code, response.status.reason)
return response.tasks
if len(response.tasks) == 0:
raise MilvusException(common_pb2.UNEXPECTED_ERROR, "no task id returned from server")
return response.tasks[0]

@retry_on_rpc_failure()
def get_bulk_insert_state(self, task_id, timeout=None, **kwargs) -> BulkInsertState:
Expand Down
2 changes: 1 addition & 1 deletion pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ def get_replicas(cls, collection_id: int):
return request

@classmethod
def bulk_insert(cls, collection_name: str, partition_name: str, files: list, **kwargs):
def do_bulk_insert(cls, collection_name: str, partition_name: str, files: list, **kwargs):
channel_names = kwargs.get("channel_names", None)
req = milvus_types.ImportRequest(
collection_name=collection_name,
Expand Down
41 changes: 29 additions & 12 deletions pymilvus/client/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,35 +1046,47 @@ def get_replicas(self, collection_name: str, timeout=None, **kwargs) -> Replica:
with self._connection() as handler:
return handler.get_replicas(collection_name, timeout=timeout, **kwargs)

def bulk_insert(self, collection_name: str, partition_name: str, files: list, timeout=None, **kwargs) -> list:
"""
Bulk insert entities through files
def do_bulk_insert(self, collection_name: str, partition_name: str, files: list, timeout=None, **kwargs) -> int:
""" do_bulk_insert inserts entities through files, currently supports row-based json file.
User need to create the json file with a specified json format which is described in the official user guide.
Let's say a collection has two fields: "id" and "vec"(dimension=8), the row-based json format is:
{"rows": [
{"id": "0", "vec": [0.190, 0.046, 0.143, 0.972, 0.592, 0.238, 0.266, 0.995]},
{"id": "1", "vec": [0.149, 0.586, 0.012, 0.673, 0.588, 0.917, 0.949, 0.944]},
......
]
}
The json file must be uploaded to root path of MinIO/S3 storage which is accessed by milvus server.
For example:
the milvus.yml specify the MinIO/S3 storage bucketName as "a-bucket", user can upload his json file
to a-bucket/xxx.json, then call do_bulk_insert(files=["a-bucket/xxx.json"])
:param collection_name: the name of the collection
:type collection_name: str
:param partition_name: the name of the partition
:type partition_name: str
:param files: file names to bulk load
:param files: related path of the file to be imported. for row-based json file, only allow
one file each invocation.
:type files: list[str]
:param timeout: The timeout for this method, unit: second
:type timeout: int
:param kwargs: other infos
:return: ids of tasks
:rtype: list[int]
:return: id of the task
:rtype: int
:raises MilvusException: If collection_name doesn't exist.
:raises BaseException: If collection_name doesn't exist.
:raises BaseException: If the files input is illegal.
"""
with self._connection() as handler:
return handler.bulk_insert(collection_name, partition_name, files, timeout=timeout, **kwargs)
return handler.do_bulk_insert(collection_name, partition_name, files, timeout=timeout, **kwargs)

def get_bulk_insert_state(self, task_id, timeout=None, **kwargs) -> BulkInsertState:
"""
Get state of a certain task_id
"""get_bulk_insert_state returns state of a certain task_id
:param task_id: the task id returned by bulk_insert
:type task_id: int
Expand All @@ -1086,8 +1098,13 @@ def get_bulk_insert_state(self, task_id, timeout=None, **kwargs) -> BulkInsertSt
return handler.get_bulk_insert_state(task_id, timeout=timeout, **kwargs)

def list_bulk_insert_tasks(self, timeout=None, **kwargs) -> list:
"""
Lists all bulk insert tasks
"""list_bulk_insert_tasks lists all bulk load tasks
:param limit: maximum number of tasks returned, list all tasks if the value is 0, else return the latest tasks
:type limit: int
:param collection_name: target collection name, list all tasks if the name is empty
:type collection_name: str
:return: list[BulkInsertState]
:rtype: list[BulkInsertState]
Expand Down
27 changes: 20 additions & 7 deletions pymilvus/orm/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,29 +636,42 @@ def list_aliases(collection_name: str, timeout=None, using="default"):
return aliases


def bulk_insert(collection_name: str, files: list, partition_name=None, timeout=None, using="default", **kwargs) -> list:
""" bulk_insert inserts entities through files, currently supports row-based json file and column-based numpy files
def do_bulk_insert(collection_name: str, files: list, partition_name=None, timeout=None, using="default", **kwargs) -> int:
""" do_bulk_insert inserts entities through files, currently supports row-based json file.
User need to create the json file with a specified json format which is described in the official user guide.
Let's say a collection has two fields: "id" and "vec"(dimension=8), the row-based json format is:
{"rows": [
{"id": "0", "vec": [0.190, 0.046, 0.143, 0.972, 0.592, 0.238, 0.266, 0.995]},
{"id": "1", "vec": [0.149, 0.586, 0.012, 0.673, 0.588, 0.917, 0.949, 0.944]},
......
]
}
The json file must be uploaded to root path of MinIO/S3 storage which is accessed by milvus server.
For example:
the milvus.yml specify the MinIO/S3 storage bucketName as "a-bucket", user can upload his json file
to a-bucket/xxx.json, then call do_bulk_insert(files=["a-bucket/xxx.json"])
:param collection_name: the name of the collection
:type collection_name: str
:param partition_name: the name of the partition
:type partition_name: str
:param files: file names to bulk load
:param files: related path of the file to be imported, for row-based json file, only allow one file each invocation.
:type files: list[str]
:param timeout: The timeout for this method, unit: second
:type timeout: int
:param kwargs: other infos
:return: ids of tasks
:rtype: list[int]
:return: id of the task
:rtype: int
:raises BaseException: If collection_name doesn't exist.
:raises BaseException: If the files input is illegal.
"""
return _get_connection(using).bulk_insert(collection_name, partition_name, files, timeout=timeout, **kwargs)
return _get_connection(using).do_bulk_insert(collection_name, partition_name, files, timeout=timeout, **kwargs)


def get_bulk_insert_state(task_id, timeout=None, using="default", **kwargs) -> BulkInsertState:
Expand All @@ -676,7 +689,7 @@ def get_bulk_insert_state(task_id, timeout=None, using="default", **kwargs) -> B
def list_bulk_insert_tasks(limit=0, collection_name=None, timeout=None, using="default", **kwargs) -> list:
"""list_bulk_insert_tasks lists all bulk load tasks
:param limit: maximum number of tasks returned, list all tasks if the value is 0
:param limit: maximum number of tasks returned, list all tasks if the value is 0, else return the latest tasks
:type limit: int
:param collection_name: target collection name, list all tasks if the name is empty
Expand Down

0 comments on commit 7be2a67

Please sign in to comment.