Skip to content

Commit

Permalink
test: Add a test for iterator enhancement (milvus-io#37296)
Browse files Browse the repository at this point in the history
related issue: milvus-io#37084

Signed-off-by: yanliang567 <[email protected]>
  • Loading branch information
yanliang567 authored Oct 30, 2024
1 parent 43ad9af commit 3a34046
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 35 deletions.
4 changes: 2 additions & 2 deletions tests/python_client/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pytest-parallel
pytest-random-order

# pymilvus
pymilvus==2.5.0rc104
pymilvus[bulk_writer]==2.5.0rc104
pymilvus==2.5.0rc106
pymilvus[bulk_writer]==2.5.0rc106

# for customize config test
python-benedict==0.24.3
Expand Down
20 changes: 10 additions & 10 deletions tests/python_client/testcases/test_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ def test_alias_drop_collection_by_alias(self):
check_items={exp_name: c_name, exp_schema: schema})
alias_name = cf.gen_unique_str(prefix)
self.utility_wrap.create_alias(collection_w.name, alias_name)
# collection_w.create_alias(alias_name)
collection_alias = self.init_collection_wrap(name=alias_name, schema=schema,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: alias_name,
Expand All @@ -427,7 +426,6 @@ def test_alias_drop_collection_by_alias(self):
collection_alias.drop(check_task=CheckTasks.err_res, check_items=error)

@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.xfail(reason="issue #36963")
def test_alias_reuse_alias_name_from_dropped_collection(self):
"""
target: test dropping a collection which has a alias
Expand Down Expand Up @@ -457,19 +455,23 @@ def test_alias_reuse_alias_name_from_dropped_collection(self):
res2 = self.utility_wrap.list_aliases(c_name)[0]
assert len(res2) == 0
# the same alias name can be reused for another collection
self.utility_wrap.create_alias(c_name, alias_name)
res2 = self.utility_wrap.list_aliases(c_name)[0]
assert len(res2) == 1
error = {ct.err_code: 999,
ct.err_msg: f"{alias_name} is alias to another collection: {collection_w.name}: alias already exist"}
self.utility_wrap.create_alias(c_name, alias_name,
check_task=CheckTasks.err_res,
check_items=error)
# res2 = self.utility_wrap.list_aliases(c_name)[0]
# assert len(res2) == 1

@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.xfail(reason="issue #36963")
def test_alias_rename_collection_to_alias_name(self):
"""
target: test renaming a collection to a alias name
method:
1.create a collection
2.create an alias for the collection
3.rename the collection to the alias name
3.rename the collection to the alias name no matter the collection was dropped or not
expected: in step 3, rename collection to alias name failed
"""
self._connect()
c_name = cf.gen_unique_str("collection")
Expand All @@ -479,15 +481,13 @@ def test_alias_rename_collection_to_alias_name(self):
alias_name = cf.gen_unique_str(prefix)
self.utility_wrap.create_alias(collection_w.name, alias_name)
error = {ct.err_code: 999,
ct.err_msg: f"duplicated new collection name default:{alias_name} with other collection name or alias"}
ct.err_msg: f"cannot rename collection to an existing alias: {alias_name}"}
self.utility_wrap.rename_collection(collection_w.name, alias_name,
check_task=CheckTasks.err_res, check_items=error)

collection_w.drop()
collection_w = self.init_collection_wrap(name=c_name, schema=default_schema,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_schema: default_schema})
error = {ct.err_code: 999,
ct.err_msg: f"this is not expected, any collection name or alias name shall be unique"}
self.utility_wrap.rename_collection(collection_w.name, alias_name,
check_task=CheckTasks.err_res, check_items=error)
67 changes: 55 additions & 12 deletions tests/python_client/testcases/test_mix_scenes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2392,25 +2392,25 @@ def test_search_pagination_group_by(self):
all_pages_grpby_field_values = []
for r in range(page_rounds):
page_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field,
param=search_param, limit=limit, offset=limit * r,
expr=default_search_exp, group_by_field=grpby_field,
output_fields=[grpby_field],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": limit},
)[0]
param=search_param, limit=limit, offset=limit * r,
expr=default_search_exp, group_by_field=grpby_field,
output_fields=[grpby_field],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": limit},
)[0]
for j in range(limit):
all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field))
all_pages_ids += page_res[0].ids
hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3)
assert hit_rate >= 0.8

total_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field,
param=search_param, limit=limit * page_rounds,
expr=default_search_exp, group_by_field=grpby_field,
output_fields=[grpby_field],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": limit * page_rounds}
)[0]
param=search_param, limit=limit * page_rounds,
expr=default_search_exp, group_by_field=grpby_field,
output_fields=[grpby_field],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": limit * page_rounds}
)[0]
hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids)))
hit_rate = round(hit_num / (limit * page_rounds), 3)
assert hit_rate >= 0.8
Expand Down Expand Up @@ -2473,3 +2473,46 @@ def test_search_pagination_group_size(self):
grpby_field_values.append(total_res[0][i].fields.get(grpby_field))
assert len(grpby_field_values) == total_count
assert len(set(grpby_field_values)) == limit * page_rounds

@pytest.mark.tags(CaseLabel.L2)
def test_search_group_size_min_max(self):
"""
verify search group by works with min and max group size
"""
group_by_field = self.inverted_string_field
default_search_field = self.vector_fields[1]
search_vectors = cf.gen_vectors(1, dim=self.dims[1], vector_data_type=self.vector_fields[1])
search_params = {}
limit = 10
max_group_size = 10
self.collection_wrap.search(data=search_vectors, anns_field=default_search_field,
param=search_params, limit=limit,
group_by_field=group_by_field,
group_size=max_group_size, group_strict_size=True,
output_fields=[group_by_field])
exceed_max_group_size = max_group_size + 1
error = {ct.err_code: 999,
ct.err_msg: f"input group size:{exceed_max_group_size} exceeds configured max "
f"group size:{max_group_size}"}
self.collection_wrap.search(data=search_vectors, anns_field=default_search_field,
param=search_params, limit=limit,
group_by_field=group_by_field,
group_size=exceed_max_group_size, group_strict_size=True,
output_fields=[group_by_field],
check_task=CheckTasks.err_res, check_items=error)

min_group_size = 1
self.collection_wrap.search(data=search_vectors, anns_field=default_search_field,
param=search_params, limit=limit,
group_by_field=group_by_field,
group_size=max_group_size, group_strict_size=True,
output_fields=[group_by_field])
below_min_group_size = min_group_size - 1
error = {ct.err_code: 999,
ct.err_msg: f"input group size:{below_min_group_size} is negative"}
self.collection_wrap.search(data=search_vectors, anns_field=default_search_field,
param=search_params, limit=limit,
group_by_field=group_by_field,
group_size=below_min_group_size, group_strict_size=True,
output_fields=[group_by_field],
check_task=CheckTasks.err_res, check_items=error)
56 changes: 45 additions & 11 deletions tests/python_client/testcases/test_query_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_query_iterator_normal(self, primary_field, with_growing):
3. query iterator with checkpoint file
4. iterator.next() for 10 times
5. delete some entities before calling a new query iterator
6. call a new query iterator with the same checkpoint file
6. call a new query iterator with the same checkpoint file, with diff batch_size and output_fields
7. iterator.next() until the end
verify:
1. no pk lost in interator results for the 2 iterators
Expand Down Expand Up @@ -59,16 +59,14 @@ def test_query_iterator_normal(self, primary_field, with_growing):
iterator.close()
assert False, f"The iterator ends before {first_iter_times} times iterators: iter_times: {iter_times}"
break
pk_name = ct.default_int64_field_name if res[0].get(ct.default_int64_field_name, None) is not None \
else ct.default_string_field_name
for i in range(len(res)):
pk_list1.append(res[i][pk_name])
pk_list1.append(res[i][primary_field])
file_exist = os.path.isfile(iterator_cp_file)
assert file_exist is True, "The checkpoint file exists without iterator close"

# 4. try to delete and insert some entities before calling a new query iterator
delete_ids = random.sample(insert_ids[:nb//2], 101) + random.sample(insert_ids[nb//2:], 101)
del_res, _ = collection_w.delete(expr=f"{pk_name} in {delete_ids}")
del_res, _ = collection_w.delete(expr=f"{primary_field} in {delete_ids}")
assert del_res.delete_count == len(delete_ids)

data = cf.gen_default_list_data(nb=333, start=nb)
Expand All @@ -77,18 +75,16 @@ def test_query_iterator_normal(self, primary_field, with_growing):
collection_w.flush()

# 5. call a new query iterator with the same checkpoint file to continue the first iterator
iterator2 = collection_w.query_iterator(batch_size, expr=expr, iterator_cp_file=iterator_cp_file)[0]
pk_list2 = []
iterator2 = collection_w.query_iterator(batch_size*2, expr=expr,
output_fields=[primary_field, ct.default_float_field_name],
iterator_cp_file=iterator_cp_file)[0]
while True:
res = iterator2.next()
if len(res) == 0:
iterator2.close()
break
pk_name = ct.default_int64_field_name if res[0].get(ct.default_int64_field_name, None) is not None \
else ct.default_string_field_name
for i in range(len(res)):
pk_list2.append(res[i][pk_name])
pk_list1.append(res[i][pk_name])
pk_list1.append(res[i][primary_field])
# 6. verify
assert len(pk_list1) == len(set(pk_list1)) == nb
file_exist = os.path.isfile(iterator_cp_file)
Expand Down Expand Up @@ -299,3 +295,41 @@ def test_query_iterator_with_dup_pk(self, primary_field):
collection_w.query_iterator(check_task=CheckTasks.check_query_iterator,
check_items={"count": nb,
"batch_size": ct.default_batch_size})

@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip("issue #37109, need debug due to the resolution of the issue")
def test_query_iterator_on_two_collections(self):
"""
target: test query iterator on two collections
method: 1. create two collections
2. query iterator on the first collection
3. check the result, expect pk
expected: query successfully
"""
# 1. initialize with data
collection_w = self.init_collection_general(prefix, True)[0]
collection_w2 = self.init_collection_general(prefix, False, primary_field=ct.default_string_field_name)[0]

data = cf.gen_default_list_data(nb=ct.default_nb, primary_field=ct.default_string_field_name)
string_values = [cf.gen_str_by_length(20) for _ in range(ct.default_nb)]
data[2] = string_values
collection_w2.insert(data)

# 2. call a new query iterator and iterator for some times
batch_size = 150
iterator_cp_file = f"/tmp/it_{collection_w.name}_cp"
iterator2 = collection_w2.query_iterator(batch_size=batch_size // 2, iterator_cp_file=iterator_cp_file)[0]
iter_times = 0
first_iter_times = ct.default_nb // batch_size // 2 // 2 # only iterate half of the data for the 1st time
while iter_times < first_iter_times:
iter_times += 1
res = iterator2.next()
if len(res) == 0:
iterator2.close()
assert False, f"The iterator ends before {first_iter_times} times iterators: iter_times: {iter_times}"
break

# 3. query iterator on the second collection with the same checkpoint file

iterator = collection_w.query_iterator(batch_size=batch_size, iterator_cp_file=iterator_cp_file)[0]
print(iterator.next())

0 comments on commit 3a34046

Please sign in to comment.