diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index 02723fad5815..016bb80336ef 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -552,14 +552,31 @@ class CDCServiceImpl::Impl { CoarseMonoClock::Now() > it->cdc_state_checkpoint.last_active_time + MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_cdc_intent_retention_ms))) { + LOG(ERROR) << "Stream ID: " << producer_tablet.stream_id + << " expired for Tablet ID: " << producer_tablet.tablet_id + << " with active time :" + << it->cdc_state_checkpoint.last_active_time.time_since_epoch(); return STATUS_FORMAT( InternalError, "stream ID $0 is expired for Tablet ID $1", producer_tablet.stream_id, producer_tablet.tablet_id); } + VLOG(1) << "Tablet :" << producer_tablet.ToString() + << " found in CDCSerive Cache with active time: " + << ": " << it->cdc_state_checkpoint.last_active_time.time_since_epoch(); } return Status::OK(); } + Result TEST_GetTabletInfoFromCache(const ProducerTabletInfo& producer_tablet) { + SharedLock l(mutex_); + auto it = tablet_checkpoints_.find(producer_tablet); + if (it != tablet_checkpoints_.end()) { + return it->cdc_state_checkpoint; + } + return STATUS_FORMAT( + InternalError, "Tablet info: $0 not found in cache.", producer_tablet.ToString()); + } + void UpdateActiveTime(const ProducerTabletInfo& producer_tablet) { SharedLock l(mutex_); auto it = tablet_checkpoints_.find(producer_tablet); @@ -1177,6 +1194,11 @@ Result> CDCService return all_tablets; } +Result CDCServiceImpl::TEST_GetTabletInfoFromCache( + const ProducerTabletInfo& producer_tablet) { + return impl_->TEST_GetTabletInfoFromCache(producer_tablet); +} + void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, GetChangesResponsePB* resp, RpcContext context) { @@ -1402,6 +1424,7 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex( std::vector servers; RETURN_NOT_OK(GetTServers(tablet_id, &servers)); + auto ts_leader = VERIFY_RESULT(GetLeaderTServer(tablet_id)); for (const auto &server : servers) { if (server->IsLocal()) { // We modify our log directly. Avoid calling itself through the proxy. @@ -1417,6 +1440,12 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex( cdc_checkpoint_min.cdc_sdk_op_id.ToPB(update_index_req.add_cdc_sdk_consumed_ops()); update_index_req.add_cdc_sdk_ops_expiration_ms( cdc_checkpoint_min.cdc_sdk_op_id_expiration.ToMilliseconds()); + // Don't update active time for the TABLET LEADER. Only update in FOLLOWERS. + if (server->permanent_uuid() != ts_leader->permanent_uuid()) { + for (auto& stream_id : cdc_checkpoint_min.active_stream_list) { + update_index_req.add_stream_ids(stream_id); + } + } rpc::RpcController rpc; rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms)); @@ -1702,7 +1731,7 @@ Result> CDCServiceImpl::GetCdcStateTable() << ", last replicated time: " << last_replicated_time_str; // Add the {tablet_id, stream_id} pair to the set if its checkpoint is OpId::Max(). - if (checkpoint == OpId::Max().ToString()) { + if (tablet_stream_to_be_deleted && checkpoint == OpId::Max().ToString()) { tablet_stream_to_be_deleted->insert({tablet_id, stream_id}); } @@ -1743,7 +1772,11 @@ Result> CDCServiceImpl::GetCdcStateTable() // Check stream associated with the tablet is active or not. // Don't consider those inactive stream for the min_checkpoint calculation. CoarseTimePoint latest_active_time = CoarseTimePoint ::min(); - if (record.source_type == CDCSDK) { + // if current tsever is the tablet LEADER, send the FOLLOWER tablets to + // update their active_time in their CDCService Cache. + std::shared_ptr tablet_peer; + Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer); + if (s.ok() && record.source_type == CDCSDK && IsTabletPeerLeader(tablet_peer)) { auto status = impl_->CheckStreamActive(producer_tablet); if (!status.ok()) { // Inactive stream read from cdc_state table are not considered for the minimum @@ -1758,6 +1791,7 @@ Result> CDCServiceImpl::GetCdcStateTable() } continue; } + tablet_min_checkpoint_map[tablet_id].active_stream_list.insert(stream_id); latest_active_time = impl_->GetLatestActiveTime(producer_tablet, *result); } @@ -2171,6 +2205,14 @@ Result> CDCServiceImpl::GetCdcStateTable() cdc_sdk_op, cdc_sdk_op_id_expiration); RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); + + if (req->stream_ids_size() > 0) { + for (int stream_idx = 0; stream_idx < req->stream_ids_size(); stream_idx++) { + ProducerTabletInfo producer_tablet = { + "" /* UUID */, req->stream_ids(stream_idx), req->tablet_ids(i)}; + impl_->UpdateActiveTime(producer_tablet); + } + } } context.RespondSuccess(); } diff --git a/ent/src/yb/cdc/cdc_service.h b/ent/src/yb/cdc/cdc_service.h index b6564fe3529f..2cebfb2473a3 100644 --- a/ent/src/yb/cdc/cdc_service.h +++ b/ent/src/yb/cdc/cdc_service.h @@ -86,6 +86,7 @@ struct TabletCDCCheckpointInfo { OpId cdc_sdk_op_id = OpId::Invalid(); MonoDelta cdc_sdk_op_id_expiration = MonoDelta::kZero; CoarseTimePoint cdc_sdk_most_active_time = CoarseTimePoint::min(); + std::unordered_set active_stream_list; }; using TabletOpIdMap = std::unordered_map; @@ -118,6 +119,8 @@ class CDCServiceImpl : public CDCServiceIf { GetCheckpointResponsePB* resp, rpc::RpcContext rpc) override; + Result TEST_GetTabletInfoFromCache(const ProducerTabletInfo& producer_tablet); + // Update peers in other tablet servers about the latest minimum applied cdc index for a specific // tablet. void UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequestPB* req, diff --git a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc index acc5f3ad74a0..8744b50016f9 100644 --- a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -458,6 +458,22 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { } } + int FindTserversWithCacheHit( + const CDCStreamId stream_id, const TabletId tablet_id, uint32_t num_tservers) { + int count = 0; + // check the CDC Service Cache of all the tservers. + for (uint32_t i = 0; i < num_tservers; ++i) { + const auto& tserver = test_cluster()->mini_tablet_server(i)->server(); + auto cdc_service = dynamic_cast( + tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto status = cdc_service->TEST_GetTabletInfoFromCache({"" /* UUID */, stream_id, tablet_id}); + if (status.ok()) { + count += 1; + } + } + return count; + } + void CheckRecord( const CDCSDKProtoRecordPB& record, CDCSDKYsqlTest::ExpectedRecord expected_records, uint32_t* count) { @@ -815,6 +831,94 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { return get_resp; } + Status ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id) { + string tool_path = GetToolPath("../bin", "yb-admin"); + vector argv; + argv.push_back(tool_path); + argv.push_back("-master_addresses"); + argv.push_back(AsString(test_cluster_.mini_cluster_->mini_master(0)->bound_rpc_addr())); + argv.push_back("leader_stepdown"); + argv.push_back(tablet_id); + argv.push_back( + test_cluster()->mini_tablet_server(new_leader_index)->server()->permanent_uuid()); + RETURN_NOT_OK(Subprocess::Call(argv)); + + return Status::OK(); + } + + void GetTabletLeaderAndAnyFollowerIndex( + const google::protobuf::RepeatedPtrField& tablets, + size_t* leader_index, size_t* follower_index) { + for (auto replica : tablets[0].replicas()) { + for (size_t i = 0; i < test_cluster()->num_tablet_servers(); i++) { + if (test_cluster()->mini_tablet_server(i)->server()->permanent_uuid() == + replica.ts_info().permanent_uuid()) { + if (replica.role() == PeerRole::LEADER) { + *leader_index = i; + LOG(INFO) << "Found first leader index: " << i; + } else if (replica.role() == PeerRole::FOLLOWER) { + *follower_index = i; + LOG(INFO) << "Found first follower index: " << i; + } + } + } + } + } + void CompareExpirationTime( + const TabletId& tablet_id, const CoarseTimePoint& prev_leader_expiry_time, + size_t current_leader_idx) { + ASSERT_OK(WaitFor( + [&]() { + CoarseTimePoint current_expiry_time; + while (true) { + for (auto const& peer : test_cluster()->GetTabletPeers(current_leader_idx)) { + if (peer->tablet_id() == tablet_id) { + current_expiry_time = peer->cdc_sdk_min_checkpoint_op_id_expiration(); + break; + } + } + if (current_expiry_time >= prev_leader_expiry_time) { + LOG(INFO) << "The expiry time for the initial leader is: " + << current_expiry_time.time_since_epoch().count() + << ", and the correct expiry time should be: " + << prev_leader_expiry_time.time_since_epoch().count(); + return true; + } + } + return false; + }, + MonoDelta::FromSeconds(60), "Waiting for active time is updated in FOLLOWERs")); + } + + void CompareCacheActiveTime( + const CDCStreamId& stream_id, const TabletId& tablet_id, + const CoarseTimePoint& prev_leader_active_time, size_t current_leader_index) { + ASSERT_OK(WaitFor( + [&]() -> Result { + while (true) { + const auto& first_tserver = + test_cluster()->mini_tablet_server(current_leader_index)->server(); + auto cdc_service = + dynamic_cast(first_tserver->rpc_server() + ->TEST_service_pool("yb.cdc.CDCService") + ->TEST_get_service() + .get()); + auto tablet_info = cdc_service->TEST_GetTabletInfoFromCache({"", stream_id, tablet_id}); + if (!tablet_info.ok()) { + return false; + } + if (tablet_info->last_active_time >= prev_leader_active_time) { + LOG(INFO) << "current_last_active_time: " + << tablet_info->last_active_time.time_since_epoch().count() + << " prev_leader_active_time: " + << prev_leader_active_time.time_since_epoch().count(); + return true; + } + } + return false; + }, + MonoDelta::FromSeconds(60), "Waiting for active time is updated in FOLLOWERs.")); + } }; TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBaseFunctions)) { @@ -3067,6 +3171,363 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestMultiStreamOnSameTableAndDele MonoDelta::FromSeconds(60), "Waiting for stream metadata cleanup.")); } +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCreateStreamAfterSetCheckpointMax)) { + FLAGS_update_min_cdc_indices_interval_secs = 1; + // We want to force every GetChanges to update the cdc_state table. + FLAGS_cdc_state_checkpoint_update_interval_ms = 0; + + ASSERT_OK(SetUpWithParams(1, 1, false)); + const uint32_t num_tablets = 1; + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); + + CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT)); + auto resp = + ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets)); + ASSERT_FALSE(resp.has_error()); + + // Insert some records in transaction. + ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); + ASSERT_OK(test_client()->FlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + GetChangesResponsePB change_resp; + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + uint32_t record_size = change_resp.cdc_sdk_proto_records_size(); + ASSERT_GE(record_size, 100); + LOG(INFO) << "Total records read by GetChanges call on stream_id_1: " << record_size; + + // Forcefully update the checkpoint of the stream as MAX. + OpId commit_op_id = OpId::Max(); + client::TableHandle cdc_state; + client::YBTableName cdc_state_table( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + ASSERT_OK(cdc_state.Open(cdc_state_table, test_client())); + const auto op = cdc_state.NewUpdateOp(); + auto* const req = op->mutable_request(); + QLAddStringHashValue(req, tablets[0].tablet_id()); + QLAddStringRangeValue(req, stream_id); + cdc_state.AddStringColumnValue(req, master::kCdcCheckpoint, commit_op_id.ToString()); + auto* condition = req->mutable_if_expr()->mutable_condition(); + condition->set_op(QL_OP_EXISTS); + auto session = test_client()->NewSession(); + EXPECT_OK(session->TEST_ApplyAndFlush(op)); + + // Now Read the cdc_state table check checkpoint is updated to MAX. + const auto read_op = cdc_state.NewReadOp(); + auto* const req_read = read_op->mutable_request(); + QLAddStringHashValue(req_read, tablets[0].tablet_id()); + auto req_cond = req->mutable_where_expr()->mutable_condition(); + req_cond->set_op(QLOperator::QL_OP_AND); + QLAddStringCondition( + req_cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, stream_id); + cdc_state.AddColumns({master::kCdcCheckpoint}, req_read); + + ASSERT_OK(WaitFor( + [&]() -> Result { + EXPECT_OK(session->TEST_ApplyAndFlush(read_op)); + auto row_block = ql::RowsResult(read_op.get()).GetRowBlock(); + if (row_block->row_count() == 1 && + row_block->row(0).column(0).string_value() == OpId::Max().ToString()) { + return true; + } + return false; + }, + MonoDelta::FromSeconds(60), + "Failed to read from cdc_state table.")); + VerifyCdcStateMatches( + test_client(), stream_id, tablets[0].tablet_id(), commit_op_id.term, commit_op_id.index); + + CDCStreamId stream_id_2 = ASSERT_RESULT(CreateDBStream(IMPLICIT)); + resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id_2, tablets)); + ASSERT_FALSE(resp.has_error()); +} + +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderChange)) { + FLAGS_update_min_cdc_indices_interval_secs = 1; + FLAGS_cdc_state_checkpoint_update_interval_ms = 0; + FLAGS_cdc_intent_retention_ms = 10000; + // FLAGS_cdc_intent_retention_ms = 1000; + const int num_tservers = 3; + ASSERT_OK(SetUpWithParams(num_tservers, 1, false)); + + const uint32_t num_tablets = 1; + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + EnableCDCServiceInAllTserver(3); + TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); + + CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT)); + auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets)); + ASSERT_FALSE(resp.has_error()); + + // Insert some records in transaction. + ASSERT_OK(WriteRows(0 /* start */, 100 /* end */, &test_cluster_)); + + GetChangesResponsePB change_resp; + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + uint32_t record_size = change_resp.cdc_sdk_proto_records_size(); + ASSERT_GE(record_size, 100); + LOG(INFO) << "Total records read by GetChanges call on stream_id_1: " << record_size; + + int cache_hit_tservers = + FindTserversWithCacheHit(stream_id, tablets[0].tablet_id(), num_tservers); + ASSERT_GE(cache_hit_tservers, 1); + + // change LEADER of the tablet to tserver-2 + ASSERT_OK(ChangeLeaderOfTablet(1, tablets[0].tablet_id())); + + // check the condition of cache after LEADER step down. + // we will see prev as well as current LEADER cache, search stream exist. + cache_hit_tservers = FindTserversWithCacheHit(stream_id, tablets[0].tablet_id(), num_tservers); + ASSERT_GE(cache_hit_tservers, 1); + + // Keep refreshing the stream from the new LEADER, till we cross the + // FLAGS_cdc_intent_retention_ms. + int idx = 0; + while (idx < 10) { + auto result = + ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp.cdc_sdk_checkpoint())); + idx += 1; + SleepFor(MonoDelta::FromMilliseconds(100)); + } + + // change LEADER of the tablet to tserver-1 + ASSERT_OK(ChangeLeaderOfTablet(0, tablets[0].tablet_id())); + + auto result = GetChangesFromCDC(stream_id, tablets, &change_resp.cdc_sdk_checkpoint()); + ASSERT_OK(result); +} + +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderReElect)) { + FLAGS_update_min_cdc_indices_interval_secs = 1; + FLAGS_update_metrics_interval_ms = 1000; + FLAGS_cdc_state_checkpoint_update_interval_ms = 0; + const int num_tservers = 3; + ASSERT_OK(SetUpWithParams(num_tservers, 1, false)); + + const uint32_t num_tablets = 1; + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets)); + + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); + + CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT)); + + auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets)); + ASSERT_FALSE(resp.has_error()); + + // Insert some records in transaction. + ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); + ASSERT_OK(test_client()->FlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + GetChangesResponsePB change_resp; + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + uint32_t record_size = change_resp.cdc_sdk_proto_records_size(); + ASSERT_GE(record_size, 100); + LOG(INFO) << "Total records read by GetChanges call on stream_id_1: " << record_size; + SleepFor(MonoDelta::FromSeconds(1)); + size_t first_leader_index = 0; + size_t first_follower_index = 0; + GetTabletLeaderAndAnyFollowerIndex(tablets, &first_leader_index, &first_follower_index); + + GetChangesResponsePB change_resp_1 = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + LOG(INFO) << "Number of records after first transaction: " << change_resp_1.records().size(); + + ASSERT_OK(ChangeLeaderOfTablet(first_follower_index, tablets[0].tablet_id())); + + size_t second_leader_index = -1; + google::protobuf::RepeatedPtrField tablets2; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets2, /* partition_list_version =*/nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + for (auto replica : tablets2[0].replicas()) { + if (replica.role() == PeerRole::LEADER) { + for (size_t i = 0; i < test_cluster()->num_tablet_servers(); i++) { + if (test_cluster()->mini_tablet_server(i)->server()->permanent_uuid() == + replica.ts_info().permanent_uuid()) { + second_leader_index = i; + LOG(INFO) << "Found second leader index: " << i; + break; + } + } + } + } + + // Insert some records in transaction after first leader stepdown. + ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); + ASSERT_OK(test_client()->FlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Call GetChanges so that the last active time is updated on the new leader. + auto result = GetChangesFromCDC(stream_id, tablets2, &change_resp.cdc_sdk_checkpoint()); + + SleepFor(MonoDelta::FromSeconds(2)); + CoarseTimePoint correct_expiry_time; + for (auto const& peer : test_cluster()->GetTabletPeers(second_leader_index)) { + if (peer->tablet_id() == tablets2[0].tablet_id()) { + correct_expiry_time = peer->cdc_sdk_min_checkpoint_op_id_expiration(); + break; + } + } + LOG(INFO) << "The correct expiry time after the final GetChanges call: " + << correct_expiry_time.time_since_epoch().count(); + + const auto& tserver = test_cluster()->mini_tablet_server(second_leader_index)->server(); + auto cdc_service = dynamic_cast( + tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto tablet_info = ASSERT_RESULT( + cdc_service->TEST_GetTabletInfoFromCache({"", stream_id, tablets[0].tablet_id()})); + auto correct_last_active_time = tablet_info.last_active_time; + // we need to ensure the initial leader get's back leadership + ASSERT_OK(ChangeLeaderOfTablet(first_follower_index, tablets[0].tablet_id())); + + // Call the test RPC to get last active time of the current leader (original), and it should + // be lower than the previously recorded last_active_time. + // SleepFor(MonoDelta::FromSeconds(2)); + CompareExpirationTime(tablets2[0].tablet_id(), correct_expiry_time, first_leader_index); + CompareCacheActiveTime( + stream_id, tablets[0].tablet_id(), correct_last_active_time, first_leader_index); +} + +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderRestart)) { + FLAGS_update_min_cdc_indices_interval_secs = 1; + FLAGS_update_metrics_interval_ms = 1000; + FLAGS_cdc_state_checkpoint_update_interval_ms = 0; + const int num_tservers = 3; + ASSERT_OK(SetUpWithParams(num_tservers, 1, false)); + + // RF: 3, num of tservers: 4. + for (int i = 0; i < 1; ++i) { + ASSERT_OK(test_cluster()->AddTabletServer()); + ASSERT_OK(test_cluster()->WaitForAllTabletServers()); + LOG(INFO) << "Added new TServer to test cluster"; + } + + const uint32_t num_tablets = 1; + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); + + CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT)); + auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets)); + ASSERT_FALSE(resp.has_error()); + + size_t first_leader_index = 0; + size_t first_follower_index = 0; + GetTabletLeaderAndAnyFollowerIndex(tablets, &first_leader_index, &first_follower_index); + + // Insert some records in transaction. + ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); + ASSERT_OK(test_client()->FlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + GetChangesResponsePB change_resp; + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + uint32_t record_size = change_resp.cdc_sdk_proto_records_size(); + ASSERT_GE(record_size, 100); + LOG(INFO) << "Total records read by GetChanges call on stream_id_1: " << record_size; + SleepFor(MonoDelta::FromSeconds(10)); + + ASSERT_OK(ChangeLeaderOfTablet(first_follower_index, tablets[0].tablet_id())); + + // Shutdown tserver hosting tablet leader. + test_cluster()->mini_tablet_server(first_leader_index)->Shutdown(); + LOG(INFO) << "TServer hosting tablet leader shutdown"; + SleepFor(MonoDelta::FromSeconds(10)); + + size_t second_leader_index = -1; + size_t second_follower_index = -1; + google::protobuf::RepeatedPtrField tablets2; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets2, /* partition_list_version =*/nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + for (auto replica : tablets2[0].replicas()) { + if (replica.role() == PeerRole::LEADER) { + for (size_t i = 0; i < test_cluster()->num_tablet_servers(); i++) { + if (i == first_leader_index) continue; + if (test_cluster()->mini_tablet_server(i)->server()->permanent_uuid() == + replica.ts_info().permanent_uuid()) { + second_leader_index = i; + LOG(INFO) << "Found second leader index: " << i; + break; + } + } + } + if (replica.role() == PeerRole::FOLLOWER) { + for (size_t i = 0; i < test_cluster()->num_tablet_servers(); i++) { + if (i == first_leader_index) continue; + if (test_cluster()->mini_tablet_server(i)->server()->permanent_uuid() == + replica.ts_info().permanent_uuid()) { + second_follower_index = i; + LOG(INFO) << "Found second follower index: " << i; + break; + } + } + } + } + + // restart the initial leader tserver + ASSERT_OK(test_cluster()->mini_tablet_server(first_leader_index)->Start()); + ASSERT_OK(test_cluster()->mini_tablet_server(first_leader_index)->WaitStarted()); + // Insert some records in transaction after leader shutdown. + ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); + ASSERT_OK(test_client()->FlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Call GetChanges so that the last active time is updated on the new leader. + auto result = GetChangesFromCDC(stream_id, tablets2, &change_resp.cdc_sdk_checkpoint()); + + SleepFor(MonoDelta::FromSeconds(2)); + CoarseTimePoint correct_expiry_time; + for (auto const& peer : test_cluster()->GetTabletPeers(second_leader_index)) { + if (peer->tablet_id() == tablets2[0].tablet_id()) { + correct_expiry_time = peer->cdc_sdk_min_checkpoint_op_id_expiration(); + } + } + LOG(INFO) << "The correct expiry time after the final GetChanges call: " + << correct_expiry_time.time_since_epoch().count(); + + const auto& tserver = test_cluster()->mini_tablet_server(second_leader_index)->server(); + auto cdc_service = dynamic_cast( + tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto tablet_info = ASSERT_RESULT( + cdc_service->TEST_GetTabletInfoFromCache({"", stream_id, tablets[0].tablet_id()})); + auto correct_last_active_time = tablet_info.last_active_time; + + ASSERT_OK(ChangeLeaderOfTablet(second_follower_index, tablets2[0].tablet_id())); + // Now shutdown the second leader again, so that the previous leader can become the leader again. + test_cluster()->mini_tablet_server(second_leader_index)->Shutdown(); + + // We need to ensure the initial leader get's back leadership. + ASSERT_OK(ChangeLeaderOfTablet(first_leader_index, tablets2[0].tablet_id())); + + // Call the test RPC to get last active time of the current leader (original), and it will + // be lower than the previously recorded last_active_time. + CompareExpirationTime(tablets2[0].tablet_id(), correct_expiry_time, first_leader_index); + CompareCacheActiveTime( + stream_id, tablets2[0].tablet_id(), correct_last_active_time, first_leader_index); +} + } // namespace enterprise } // namespace cdc } // namespace yb diff --git a/src/yb/cdc/cdc_service.proto b/src/yb/cdc/cdc_service.proto index 694a22525b97..c723e3044103 100644 --- a/src/yb/cdc/cdc_service.proto +++ b/src/yb/cdc/cdc_service.proto @@ -411,6 +411,7 @@ message UpdateCdcReplicatedIndexRequestPB { // Minimum checkpoint term and index for CDCSDK stream, to do intent cleanup for this tablet. repeated OpIdPB cdc_sdk_consumed_ops = 7; repeated uint64 cdc_sdk_ops_expiration_ms = 8; + repeated bytes stream_ids = 9; } message UpdateCdcReplicatedIndexResponsePB { diff --git a/src/yb/tablet/tablet_peer.cc b/src/yb/tablet/tablet_peer.cc index 704ea12a2309..97b0ce5db97b 100644 --- a/src/yb/tablet/tablet_peer.cc +++ b/src/yb/tablet/tablet_peer.cc @@ -1024,6 +1024,15 @@ OpId TabletPeer::cdc_sdk_min_checkpoint_op_id() { return meta_->cdc_sdk_min_checkpoint_op_id(); } +CoarseTimePoint TabletPeer::cdc_sdk_min_checkpoint_op_id_expiration() { + auto txn_participant = tablet()->transaction_participant(); + if (txn_participant) { + return txn_participant->GetCheckpointExpirationTime(); + } + + return CoarseTimePoint(); +} + Status TabletPeer::SetCDCSDKRetainOpIdAndTime( const OpId& cdc_sdk_op_id, const MonoDelta& cdc_sdk_op_id_expiration) { if (cdc_sdk_op_id == OpId::Invalid()) { diff --git a/src/yb/tablet/tablet_peer.h b/src/yb/tablet/tablet_peer.h index b1c04b6eef46..fb5eda93f7bf 100644 --- a/src/yb/tablet/tablet_peer.h +++ b/src/yb/tablet/tablet_peer.h @@ -384,6 +384,8 @@ class TabletPeer : public consensus::ConsensusContext, OpId cdc_sdk_min_checkpoint_op_id(); + CoarseTimePoint cdc_sdk_min_checkpoint_op_id_expiration(); + Status SetCDCSDKRetainOpIdAndTime( const OpId& cdc_sdk_op_id, const MonoDelta& cdc_sdk_op_id_expiration); diff --git a/src/yb/tablet/transaction_participant.cc b/src/yb/tablet/transaction_participant.cc index 44a19b3a5153..26c128c75552 100644 --- a/src/yb/tablet/transaction_participant.cc +++ b/src/yb/tablet/transaction_participant.cc @@ -369,6 +369,11 @@ class TransactionParticipant::Impl return cdc_sdk_min_checkpoint_op_id_; } + CoarseTimePoint GetCheckpointExpirationTime() { + std::lock_guard lock(mutex_); + return cdc_sdk_min_checkpoint_op_id_expiration_; + } + // Cleans transactions that are requested and now is safe to clean. // See RemoveUnlocked for details. void CleanTransactionsUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) { @@ -1827,5 +1832,9 @@ OpId TransactionParticipant::GetRetainOpId() const { return impl_->GetRetainOpId(); } +CoarseTimePoint TransactionParticipant::GetCheckpointExpirationTime() const { + return impl_->GetCheckpointExpirationTime(); +} + } // namespace tablet } // namespace yb diff --git a/src/yb/tablet/transaction_participant.h b/src/yb/tablet/transaction_participant.h index 4350d6a66b1c..22a2b278c415 100644 --- a/src/yb/tablet/transaction_participant.h +++ b/src/yb/tablet/transaction_participant.h @@ -214,6 +214,8 @@ class TransactionParticipant : public TransactionStatusManager { OpId GetRetainOpId() const; + CoarseTimePoint GetCheckpointExpirationTime() const; + const TabletId& tablet_id() const override; size_t TEST_GetNumRunningTransactions() const;