Skip to content

Commit

Permalink
[fix](txn_manager) Add ingested rowsets to unused rowsets when removi…
Browse files Browse the repository at this point in the history
…ng txn (#37417)

Generally speaking, as long as a rowset has a version, it can be
considered not to be in a pending state. However, if the rowset was
created through ingesting binlogs, it will have a version but should
still be considered in a pending state because the ingesting txn has not
yet been committed.

This PR updates the condition for determining the pending state. If a
rowset is COMMITTED, the txn should be allowed to roll back even if a
version exists.

Cherry-pick #36551
  • Loading branch information
w41ter authored Jul 10, 2024
1 parent f65d1c4 commit afcc617
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 6 deletions.
11 changes: 10 additions & 1 deletion be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,16 @@ static bvar::Adder<size_t> g_total_rowset_num("doris_total_rowset_num");

Rowset::Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta)
: _rowset_meta(rowset_meta), _refs_by_reader(0) {
_is_pending = !_rowset_meta->has_version();
_is_pending = true;

// Generally speaking, as long as a rowset has a version, it can be considered not to be in a pending state.
// However, if the rowset was created through ingesting binlogs, it will have a version but should still be
// considered in a pending state because the ingesting txn has not yet been committed.
if (_rowset_meta->has_version() && _rowset_meta->start_version() > 0 &&
_rowset_meta->rowset_state() != COMMITTED) {
_is_pending = false;
}

if (_is_pending) {
_is_cumulative = false;
} else {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }

// remove all files in this rowset
// TODO should we rename the method to remove_files() to be more specific?
Expand Down
10 changes: 8 additions & 2 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,14 @@ void StorageEngine::clear_transaction_task(const TTransactionId transaction_id,
<< ", tablet_uid=" << tablet_info.first.tablet_uid;
continue;
}
static_cast<void>(StorageEngine::instance()->txn_manager()->delete_txn(
partition_id, tablet, transaction_id));
Status s = StorageEngine::instance()->txn_manager()->delete_txn(partition_id, tablet,
transaction_id);
if (!s.ok()) {
LOG(WARNING) << "failed to clear transaction. txn_id=" << transaction_id
<< ", partition_id=" << partition_id
<< ", tablet_id=" << tablet_info.first.tablet_id
<< ", status=" << s.to_string();
}
}
}
LOG(INFO) << "finish to clear transaction task. transaction_id=" << transaction_id;
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,13 +604,14 @@ Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
auto& load_info = load_itr->second;
auto& rowset = load_info->rowset;
if (rowset != nullptr && meta != nullptr) {
if (rowset->version().first > 0) {
if (!rowset->is_pending()) {
return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
"could not delete transaction from engine, just remove it from memory not "
"delete from disk, because related rowset already published. partition_id: "
"{}, transaction_id: {}, tablet: {}, rowset id: {}, version:{}",
"{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}",
key.first, key.second, tablet_info.to_string(),
rowset->rowset_id().to_string(), rowset->version().to_string());
rowset->rowset_id().to_string(), rowset->version().to_string(),
RowsetStatePB_Name(rowset->rowset_meta_state()));
} else {
static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
#ifndef BE_TEST
Expand Down
22 changes: 22 additions & 0 deletions be/test/olap/test_data/rowset_meta3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"rowset_id": 10002,
"partition_id": 10001,
"tablet_id": 12046,
"tablet_schema_hash": 365187263,
"rowset_type": "BETA_ROWSET",
"rowset_state": "COMMITTED",
"start_version": 0,
"end_version": 1,
"num_rows": 0,
"total_disk_size": 0,
"data_disk_size": 0,
"index_disk_size": 0,
"empty": true,
"creation_time": 1552911435,
"tablet_uid": {
"hi": 10,
"lo": 10
},
"num_segments": 1,
"has_variant_type_in_schema": false
}
37 changes: 37 additions & 0 deletions be/test/olap/txn_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static StorageEngine* k_engine = nullptr;

const std::string rowset_meta_path = "./be/test/olap/test_data/rowset_meta.json";
const std::string rowset_meta_path_2 = "./be/test/olap/test_data/rowset_meta2.json";
const std::string rowset_meta_path_3 = "./be/test/olap/test_data/rowset_meta3.json";

class TxnManagerTest : public testing::Test {
public:
Expand Down Expand Up @@ -169,6 +170,22 @@ class TxnManagerTest : public testing::Test {
EXPECT_EQ(rowset_meta2->rowset_id(), rowset_id);
EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema, rowset_meta_path_2,
rowset_meta2, &_rowset_diff_id));

// init rowset meta 3
_json_rowset_meta = "";
std::ifstream infile3(rowset_meta_path_3);
char buffer3[1024];
while (!infile3.eof()) {
infile3.getline(buffer3, 1024);
_json_rowset_meta = _json_rowset_meta + buffer3 + "\n";
}
_json_rowset_meta = _json_rowset_meta.substr(0, _json_rowset_meta.size() - 1);
rowset_id.init(10002);
RowsetMetaSharedPtr rowset_meta3(new RowsetMeta());
rowset_meta3->init_from_json(_json_rowset_meta);
EXPECT_EQ(rowset_meta3->rowset_id(), rowset_id);
EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema, rowset_meta_path_3,
rowset_meta3, &_rowset_ingested));
_tablet_uid = TabletUid(10, 10);
}

Expand All @@ -190,6 +207,7 @@ class TxnManagerTest : public testing::Test {
RowsetSharedPtr _rowset;
RowsetSharedPtr _rowset_same_id;
RowsetSharedPtr _rowset_diff_id;
RowsetSharedPtr _rowset_ingested;
};

TEST_F(TxnManagerTest, PrepareNewTxn) {
Expand Down Expand Up @@ -363,4 +381,23 @@ TEST_F(TxnManagerTest, TabletVersionCache) {
EXPECT_EQ(tx6, 890);
}

TEST_F(TxnManagerTest, DeleteCommittedTxnForIngestingBinlog) {
auto guard = k_engine->pending_local_rowsets().add(_rowset_ingested->rowset_id());
auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid,
load_id, _rowset_ingested, std::move(guard), false);
ASSERT_TRUE(st.ok()) << st;
RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset_ingested->rowset_id(),
rowset_meta);
ASSERT_TRUE(st.ok()) << st;
EXPECT_EQ(rowset_meta->rowset_id(), _rowset_ingested->rowset_id());
st = _txn_mgr->delete_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid);
ASSERT_TRUE(st.ok()) << st;
RowsetMetaSharedPtr rowset_meta2(new RowsetMeta());
st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset_ingested->rowset_id(),
rowset_meta2);
ASSERT_FALSE(st.ok()) << st;
EXPECT_FALSE(k_engine->pending_local_rowsets().contains(_rowset_ingested->rowset_id()));
}

} // namespace doris

0 comments on commit afcc617

Please sign in to comment.