Skip to content

Commit

Permalink
fix: fix query count(*) concurrently (#35007)
Browse files Browse the repository at this point in the history
#34778
#34849
fix two problems:
1. count(*) incorrect, if growing insert duplicated (pk, timestamp)
pairs that pk and timestamp all same, need to keep just one pair.
2. count(*) may core dump, if get_real_count interface get snapshot and
do mvcc at not consistency status, mainly happens under concurrency.

Signed-off-by: luzhang <[email protected]>
Co-authored-by: luzhang <[email protected]>
  • Loading branch information
zhagnlu and luzhang authored Jul 29, 2024
1 parent 9727522 commit 86322e0
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 35 deletions.
39 changes: 31 additions & 8 deletions internal/core/src/segcore/DeletedRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,29 @@ class DeletedRecord {
int64_t removed_num = 0;
int64_t mem_add = 0;
for (size_t i = 0; i < pks.size(); ++i) {
auto offsets = insert_record_->search_pk(pks[i], timestamps[i]);
auto delete_pk = pks[i];
auto delete_timestamp = timestamps[i];
auto offsets =
insert_record_->search_pk(delete_pk, delete_timestamp);
bool has_duplicate_pk_timestamps = false;
for (auto offset : offsets) {
int64_t insert_row_offset = offset.get();
// Assert(insert_record->timestamps_.size() >= insert_row_offset);
if (insert_record_->timestamps_[insert_row_offset] <
timestamps[i]) {
InsertIntoInnerPairs(timestamps[i], {insert_row_offset});
int64_t row_offset = offset.get();
auto row_timestamp = insert_record_->timestamps_[row_offset];
// Assert(insert_record->timestamps_.size() >= row_offset);
if (row_timestamp < delete_timestamp) {
InsertIntoInnerPairs(delete_timestamp, {row_offset});
removed_num++;
mem_add += sizeof(Timestamp) + sizeof(int64_t);
} else if (row_timestamp == delete_timestamp) {
// if insert record have multi same (pk, timestamp) pairs,
// need to remove the next pairs, just keep first
if (!has_duplicate_pk_timestamps) {
has_duplicate_pk_timestamps = true;
} else {
InsertIntoInnerPairs(delete_timestamp, {row_offset});
removed_num++;
mem_add += sizeof(Timestamp) + sizeof(int64_t);
}
}
}
}
Expand All @@ -84,15 +98,24 @@ class DeletedRecord {
auto end = deleted_pairs_.lower_bound(
std::make_pair(timestamp, std::set<int64_t>{}));
for (auto it = deleted_pairs_.begin(); it != end; it++) {
// this may happen if lower_bound end is deleted_pairs_ end and
// other threads insert node to deleted_pairs_ concurrently
if (it->first > timestamp) {
break;
}
for (auto& v : it->second) {
bitset.set(v);
if (v < insert_barrier) {
bitset.set(v);
}
}
}

// handle the case where end points to an element with the same timestamp
if (end != deleted_pairs_.end() && end->first == timestamp) {
for (auto& v : end->second) {
bitset.set(v);
if (v < insert_barrier) {
bitset.set(v);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/core/unittest/test_array_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ TEST(Expr, TestArrayEqual) {
std::vector<ScalarArray> long_array_col;
int num_iters = 1;
for (int iter = 0; iter < num_iters; ++iter) {
auto raw_data = DataGen(schema, N, iter, 0, 1, 3);
auto raw_data = DataGen(schema, N, iter, 0, 0, 1, 3);
auto new_long_array_col = raw_data.get_col<ScalarArray>(long_array_fid);
long_array_col.insert(long_array_col.end(),
new_long_array_col.begin(),
Expand Down
6 changes: 3 additions & 3 deletions internal/core/unittest/test_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) {
auto col = (milvus::segcore::Collection*)collection;

int N = 20;
auto dataset = DataGen(col->get_schema(), N, 42, 0, 2);
auto dataset = DataGen(col->get_schema(), N, 42, 0, 0, 2);

auto segment_interface = reinterpret_cast<SegmentInterface*>(segment);
auto sealed_segment = dynamic_cast<SegmentSealed*>(segment_interface);
Expand Down Expand Up @@ -1156,7 +1156,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) {

// second insert data
// insert data with pks = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} , timestamps = {10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
dataset = DataGen(col->get_schema(), N, 42, N);
dataset = DataGen(col->get_schema(), N, 42, 0, N);
insert_data = serialize(dataset.raw_);
PreInsert(segment, N, &offset);
res = Insert(segment,
Expand Down Expand Up @@ -1194,7 +1194,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) {
auto col = (milvus::segcore::Collection*)collection;

int N = 10;
auto dataset = DataGen(col->get_schema(), N, 42, 0, 2);
auto dataset = DataGen(col->get_schema(), N, 42, 0, 0, 2);

// insert data with pks = {0, 0, 1, 1, 2, 2, 3, 3, 4, 4} , timestamps = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
auto segment_interface = reinterpret_cast<SegmentInterface*>(segment);
Expand Down
4 changes: 2 additions & 2 deletions internal/core/unittest/test_c_stream_reduce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ TEST(CApiTest, StreamReduce) {

//2. insert data into segments
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
auto dataset_1 = DataGen(schema, N, 55, 0, 1, 10, true);
auto dataset_1 = DataGen(schema, N, 55, 0, 0, 1, 10, true);
int64_t offset_1;
PreInsert(segment_1, N, &offset_1);
auto insert_data_1 = serialize(dataset_1.raw_);
Expand All @@ -42,7 +42,7 @@ TEST(CApiTest, StreamReduce) {
insert_data_1.size());
ASSERT_EQ(ins_res_1.error_code, Success);

auto dataset_2 = DataGen(schema, N, 66, 0, 1, 10, true);
auto dataset_2 = DataGen(schema, N, 66, 0, 0, 1, 10, true);
int64_t offset_2;
PreInsert(segment_2, N, &offset_2);
auto insert_data_2 = serialize(dataset_2.raw_);
Expand Down
14 changes: 7 additions & 7 deletions internal/core/unittest/test_group_by.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ TEST(GroupBY, SealedIndex) {
size_t N = 50;

//2. load raw data
auto raw_data = DataGen(schema, N, 42, 0, 8, 10, false, false);
auto raw_data = DataGen(schema, N, 42, 0, 0, 8, 10, false, false);
auto fields = schema->get_fields();
for (auto field_data : raw_data.raw_->fields_data()) {
int64_t field_id = field_data.field_id();
Expand Down Expand Up @@ -447,7 +447,7 @@ TEST(GroupBY, SealedData) {
size_t N = 100;

//2. load raw data
auto raw_data = DataGen(schema, N, 42, 0, 8, 10, false, false);
auto raw_data = DataGen(schema, N, 42, 0, 0, 8, 10, false, false);
auto fields = schema->get_fields();
for (auto field_data : raw_data.raw_->fields_data()) {
int64_t field_id = field_data.field_id();
Expand Down Expand Up @@ -542,9 +542,9 @@ TEST(GroupBY, Reduce) {
int repeat_count_1 = 2;
int repeat_count_2 = 5;
auto raw_data1 =
DataGen(schema, N, seed, ts_offset, repeat_count_1, false, false);
DataGen(schema, N, seed, 0, ts_offset, repeat_count_1, false, false);
auto raw_data2 =
DataGen(schema, N, seed, ts_offset, repeat_count_2, false, false);
DataGen(schema, N, seed, 0, ts_offset, repeat_count_2, false, false);

auto fields = schema->get_fields();
//load segment1 raw data
Expand Down Expand Up @@ -676,7 +676,7 @@ TEST(GroupBY, GrowingRawData) {
int n_batch = 3;
for (int i = 0; i < n_batch; i++) {
auto data_set =
DataGen(schema, rows_per_batch, 42, 0, 8, 10, false, false);
DataGen(schema, rows_per_batch, 42, 0, 0, 8, 10, false, false);
auto offset = segment_growing_impl->PreInsert(rows_per_batch);
segment_growing_impl->Insert(offset,
rows_per_batch,
Expand Down Expand Up @@ -774,9 +774,9 @@ TEST(GroupBY, GrowingIndex) {
int64_t rows_per_batch = 1024;
int n_batch = 10;
for (int i = 0; i < n_batch; i++) {
auto data_set =
DataGen(schema, rows_per_batch, 42, 0, 8, 10, false, false);
auto offset = segment_growing_impl->PreInsert(rows_per_batch);
auto data_set = DataGen(
schema, rows_per_batch, 42, offset, offset, 1, 10, false, false);
segment_growing_impl->Insert(offset,
rows_per_batch,
data_set.row_ids_.data(),
Expand Down
30 changes: 29 additions & 1 deletion internal/core/unittest/test_growing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ TEST(Growing, RemoveDuplicatedRecords) {
int64_t c = 1000;
auto offset = 0;

auto dataset = DataGen(schema, c, 42, 0, 1, 10, true);
auto dataset = DataGen(schema, c, 42, 0, 0, 1, 10, true);
auto pks = dataset.get_col<int64_t>(pk);
segment->Insert(offset,
c,
Expand Down Expand Up @@ -109,6 +109,34 @@ TEST(Growing, RemoveDuplicatedRecords) {
}
}

TEST(Growing, RealCountWithDuplicateRecords) {
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk);
auto segment = CreateGrowingSegment(schema, empty_index_meta);

int64_t c = 10;
auto offset = 0;
auto dataset = DataGen(schema, c);
auto pks = dataset.get_col<int64_t>(pk);

// insert same values twice
segment->Insert(offset,
c,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);

segment->Insert(offset + c,
c,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);

// real count is still c not 2c
ASSERT_EQ(c, segment->get_real_count());
}

TEST(Growing, RealCount) {
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
Expand Down
2 changes: 1 addition & 1 deletion internal/core/unittest/test_retrieve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ TEST_P(RetrieveTest, LargeTimestamp) {
int choose_sep = 3;
auto choose = [=](int i) { return i * choose_sep % N; };
uint64_t ts_offset = 100;
auto dataset = DataGen(schema, N, 42, ts_offset + 1);
auto dataset = DataGen(schema, N, 42, 0, ts_offset + 1);
auto segment = CreateSealedSegment(schema);
SealedLoadFieldData(dataset, *segment);
auto i64_col = dataset.get_col<int64_t>(fid_64);
Expand Down
27 changes: 16 additions & 11 deletions internal/core/unittest/test_utils/DataGen.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ struct GeneratedData {
DataGen(SchemaPtr schema,
int64_t N,
uint64_t seed,
uint64_t pk_offset,
uint64_t ts_offset,
int repeat_count,
int array_len,
Expand Down Expand Up @@ -317,14 +318,16 @@ GenerateRandomSparseFloatVector(size_t rows,
return tensor;
}

inline GeneratedData DataGen(SchemaPtr schema,
int64_t N,
uint64_t seed = 42,
uint64_t ts_offset = 0,
int repeat_count = 1,
int array_len = 10,
bool random_pk = false,
bool random_val = true) {
inline GeneratedData
DataGen(SchemaPtr schema,
int64_t N,
uint64_t seed = 42,
uint64_t pk_offset = 0,
uint64_t ts_offset = 0,
int repeat_count = 1,
int array_len = 10,
bool random_pk = false,
bool random_val = true) {
using std::vector;
std::default_random_engine random(seed);
std::normal_distribution<> distr(0, 1);
Expand Down Expand Up @@ -425,9 +428,11 @@ inline GeneratedData DataGen(SchemaPtr schema,
case DataType::INT64: {
vector<int64_t> data(N);
for (int i = 0; i < N; i++) {
if (random_pk && schema->get_primary_field_id()->get() ==
field_id.get()) {
data[i] = random() % N;
if (schema->get_primary_field_id()->get() ==
field_id.get()) {
data[i] = random_pk
? random() % N + pk_offset
: data[i] = i / repeat_count + pk_offset;
} else {
data[i] = i / repeat_count;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/python_client/testcases/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2996,7 +2996,7 @@ def test_count_duplicate_ids(self):
# query count
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: tmp_nb}]}
check_items={exp_res: [{count: 1}]}
)

# delete and verify count
Expand Down

0 comments on commit 86322e0

Please sign in to comment.