Skip to content

Commit

Permalink
Example of insert rows
Browse files Browse the repository at this point in the history
Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 committed May 21, 2023
1 parent 0994731 commit 9377869
Show file tree
Hide file tree
Showing 15 changed files with 705 additions and 324 deletions.
12 changes: 11 additions & 1 deletion pymilvus/client/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, raw):
self.indexes = []
self.params = {}
self.is_partition_key = False
self.is_dynamic = False

##
self.__pack(self._raw)
Expand All @@ -81,6 +82,7 @@ def __pack(self, raw):
self.auto_id = raw.autoID
self.type = raw.data_type
self.is_partition_key = raw.is_partition_key
self.is_dynamic = raw.is_dynamic
# self.type = DataType(int(raw.type))

for type_param in raw.type_params:
Expand Down Expand Up @@ -113,6 +115,7 @@ def dict(self):
"is_primary": self.is_primary,
"auto_id": self.auto_id,
"is_partition_key": self.is_partition_key,
"is_dynamic": self.is_dynamic,
}
return _dict

Expand All @@ -134,6 +137,7 @@ def __init__(self, raw):
self.properties = {}
self.num_shards = 0
self.num_partitions = 0
self.enable_dynamic_field = False

#
if self._raw:
Expand All @@ -153,6 +157,11 @@ def __pack(self, raw):
except Exception:
self.consistency_level = DEFAULT_CONSISTENCY_LEVEL

try:
self.enable_dynamic_field = raw.schema.enable_dynamic_field
except Exception:
self.enable_dynamic_field = False

# self.params = dict()
# TODO: extra_params here
# for kv in raw.extra_params:
Expand Down Expand Up @@ -181,6 +190,7 @@ def dict(self):
"consistency_level": self.consistency_level,
"properties": self.properties,
"num_partitions": self.num_partitions,
"enable_dynamic_field": self.enable_dynamic_field,
}
return _dict

Expand Down Expand Up @@ -381,7 +391,7 @@ def err_index(self):

def __str__(self):
return f"(insert count: {self._insert_cnt}, delete count: {self._delete_cnt}, upsert count: {self._upsert_cnt}, " \
f"timestamp: {self._timestamp}, success count: {self.succ_count}, err count: {self.err_count})"
f"timestamp: {self._timestamp}, success count: {self.succ_count}, err count: {self.err_count})"

__repr__ = __str__

Expand Down
24 changes: 18 additions & 6 deletions pymilvus/client/entity_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,37 @@ def check_str_arr(str_arr, max_len):
f"max length: {max_len}")


def entity_to_str_arr(entity, field_info, check=True):
def convert_to_str_array(orig_str_arr, field_info, check=True):
arr = []
if Config.EncodeProtocol.lower() != 'utf-8'.lower():
for s in entity.get("values"):
for s in orig_str_arr:
arr.append(s.encode(Config.EncodeProtocol))
else:
arr = entity.get("values")
arr = orig_str_arr
max_len = int(get_max_len_of_var_char(field_info))
if check:
check_str_arr(arr, max_len)
return arr

def entity_to_json_arr(entity):

def entity_to_str_arr(entity, field_info, check=True):
return convert_to_str_array(entity.get("values", []), field_info, check=check)


def convert_to_json(obj):
return ujson.dumps(obj).encode(Config.EncodeProtocol)

def convert_to_json_arr(objs):
arr = []
for obj in entity.get("values"):
for obj in objs:
arr.append(ujson.dumps(obj).encode(Config.EncodeProtocol))

return arr


def entity_to_json_arr(entity):
return convert_to_json_arr(entity.get("values", []))


# TODO: refactor here.
def entity_to_field_data(entity, field_info):
field_data = schema_types.FieldData()
Expand Down
64 changes: 60 additions & 4 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def _setup_grpc_channel(self):
else:
if self._client_pem_path != "" and self._client_key_path != "" and self._ca_pem_path != "" \
and self._server_name != "":
opts.append(('grpc.ssl_target_name_override', self._server_name, ),)
opts.append(('grpc.ssl_target_name_override', self._server_name,), )
with open(self._client_pem_path, 'rb') as f:
certificate_chain = f.read()
with open(self._client_key_path, 'rb') as f:
Expand Down Expand Up @@ -356,7 +356,24 @@ def get_partition_stats(self, collection_name, partition_name, timeout=None, **k

raise MilvusException(status.error_code, status.reason)

def _prepare_batch_insert_or_upsert_request(self, collection_name, entities, partition_name=None, timeout=None, isInsert=True, **kwargs):
def _prepare_row_insert_or_upsert_request(self, collection_name, rows, partition_name=None, timeout=None,
isInsert=True, **kwargs):
if not isinstance(rows, list):
raise ParamError(message="None rows, please provide valid row data.")

collection_schema = kwargs.get("schema", None)
if not collection_schema:
collection_schema = self.describe_collection(
collection_name, timeout=timeout, **kwargs)

fields_info = collection_schema["fields"]
enable_dynamic = collection_schema.get("enable_dynamic_field", False)
request = Prepare.row_insert_or_upsert_param(collection_name, rows, partition_name, fields_info, isInsert,
enable_dynamic=enable_dynamic)
return request

def _prepare_batch_insert_or_upsert_request(self, collection_name, entities, partition_name=None, timeout=None,
isInsert=True, **kwargs):
param = kwargs.get('insert_param', None)

if not isInsert:
Expand All @@ -381,6 +398,24 @@ def _prepare_batch_insert_or_upsert_request(self, collection_name, entities, par

return request

@retry_on_rpc_failure()
def insert_rows(self, collection_name, entities, partition_name=None, timeout=None, **kwargs):
if isinstance(entities, dict):
entities = [entities]
try:
request = self._prepare_row_insert_or_upsert_request(
collection_name, entities, partition_name, timeout, **kwargs)
rf = self._stub.Insert.future(request, timeout=timeout)
response = rf.result()
if response.status.error_code == 0:
m = MutationResult(response)
ts_utils.update_collection_ts(collection_name, m.timestamp)
return m

raise MilvusException(response.status.error_code, response.status.reason)
except Exception as err:
raise err

@retry_on_rpc_failure()
def batch_insert(self, collection_name, entities, partition_name=None, timeout=None, **kwargs):
if not check_invalid_binary_vector(entities):
Expand Down Expand Up @@ -461,6 +496,25 @@ def upsert(self, collection_name, entities, partition_name=None, timeout=None, *
return MutationFuture(None, None, err)
raise err

@retry_on_rpc_failure()
def upsert_rows(self, collection_name, entities, partition_name=None, timeout=None, **kwargs):
if isinstance(entities, dict):
entities = [entities]
try:
request = self._prepare_row_insert_or_upsert_request(
collection_name, entities, partition_name, timeout, False, **kwargs)
rf = self._stub.Upsert.future(request, timeout=timeout)
response = rf.result()
if response.status.error_code == 0:
m = MutationResult(response)
ts_utils.update_collection_ts(collection_name, m.timestamp)
return m

raise MilvusException(
response.status.error_code, response.status.reason)
except Exception as err:
raise err

def _execute_search_requests(self, requests, timeout=None, **kwargs):
auto_id = kwargs.get("auto_id", True)

Expand Down Expand Up @@ -694,7 +748,8 @@ def wait_for_creating_index(self, collection_name, index_name, timeout=None, **k
return False, fail_reason
end = time.time()
if isinstance(timeout, int) and end - start > timeout:
raise MilvusException(message=f"collection {collection_name} create index {index_name} timeout in {timeout}s")
raise MilvusException(
message=f"collection {collection_name} create index {index_name} timeout in {timeout}s")

@retry_on_rpc_failure()
def load_collection(self, collection_name, replica_number=1, timeout=None, **kwargs):
Expand Down Expand Up @@ -786,7 +841,8 @@ def can_loop(t) -> bool:
if progress >= 100:
return
time.sleep(Config.WaitTimeDurationWhenLoad)
raise MilvusException(message=f"wait for loading partition timeout, collection: {collection_name}, partitions: {partition_names}")
raise MilvusException(
message=f"wait for loading partition timeout, collection: {collection_name}, partitions: {partition_names}")

@retry_on_rpc_failure()
def get_loading_progress(self, collection_name, partition_names=None, timeout=None):
Expand Down
Loading

0 comments on commit 9377869

Please sign in to comment.