diff --git a/src/yb/master/xcluster/add_table_to_xcluster_source_task.cc b/src/yb/master/xcluster/add_table_to_xcluster_source_task.cc index a0f605defdc4..2a2d0d581550 100644 --- a/src/yb/master/xcluster/add_table_to_xcluster_source_task.cc +++ b/src/yb/master/xcluster/add_table_to_xcluster_source_task.cc @@ -65,6 +65,12 @@ Status AddTableToXClusterSourceTask::MarkTableAsCheckpointed() { RETURN_NOT_OK(outbound_replication_group_->MarkNewTablesAsCheckpointed( table_info_->namespace_id(), table_info_->id(), epoch_)); + const auto stream_id = VERIFY_RESULT( + outbound_replication_group_->GetStreamId(table_info_->namespace_id(), table_info_->id())); + LOG_WITH_PREFIX(INFO) + << "Table " << table_info_->ToString() + << " successfully checkpointed for xCluster universe replication with Stream: " << stream_id; + Complete(); return Status::OK(); } diff --git a/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc b/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc index d601673bd6a4..ef92de9fd0ce 100644 --- a/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc +++ b/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc @@ -261,8 +261,8 @@ Status AddTableToXClusterTargetTask::WaitForXClusterSafeTimeCaughtUp() { return Status::OK(); } - LOG(INFO) << "Table " << table_info_->ToString() - << " successfully added to xCluster universe replication"; + LOG_WITH_PREFIX(INFO) << "Table " << table_info_->ToString() + << " successfully added to xCluster universe replication"; RETURN_NOT_OK(CleanupAndComplete()); return Status::OK(); diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc index 165cccf7aea4..31c96d670b06 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc @@ -1132,4 +1132,17 @@ Result> XClusterOutboundReplicationGroup::GetNamespaces return namespace_ids; } +Result XClusterOutboundReplicationGroup::GetStreamId( + const NamespaceId& namespace_id, const TableId& table_id) const { + SharedLock mutex_lock(mutex_); + auto l = VERIFY_RESULT(LockForRead()); + + auto* ns_info = VERIFY_RESULT(GetNamespaceInfo(namespace_id)); + auto* table_info = FindOrNull(ns_info->table_infos(), table_id); + + SCHECK(table_info, NotFound, "Table $0 not found in $1", table_id, namespace_id); + + return table_info->stream_id(); +} + } // namespace yb::master diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group.h b/src/yb/master/xcluster/xcluster_outbound_replication_group.h index 7719e42462b4..6e919220ced7 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group.h +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group.h @@ -135,6 +135,9 @@ class XClusterOutboundReplicationGroup Result> GetNamespaces() const EXCLUDES(mutex_); + Result GetStreamId(const NamespaceId& namespace_id, const TableId& table_id) const + EXCLUDES(mutex_); + private: friend class XClusterOutboundReplicationGroupMocked; friend class AddTableToXClusterSourceTask; diff --git a/src/yb/tserver/xcluster_ddl_queue_handler-test.cc b/src/yb/tserver/xcluster_ddl_queue_handler-test.cc index da3b4610fe2b..412d82349a0d 100644 --- a/src/yb/tserver/xcluster_ddl_queue_handler-test.cc +++ b/src/yb/tserver/xcluster_ddl_queue_handler-test.cc @@ -43,7 +43,7 @@ class XClusterDDLQueueHandlerMocked : public XClusterDDLQueueHandler { const client::YBTableName& table_name, MockTserverXClusterContext& xcluster_context) : XClusterDDLQueueHandler( /* local_client */ nullptr, table_name.namespace_name(), table_name.namespace_id(), - xcluster_context, /* connect_to_pg_func */ nullptr) {} + /* log_prefix */ "", xcluster_context, /* connect_to_pg_func */ nullptr) {} Status ProcessDDLQueueTable( const std::optional& apply_safe_time, int num_records = 1) { diff --git a/src/yb/tserver/xcluster_ddl_queue_handler.cc b/src/yb/tserver/xcluster_ddl_queue_handler.cc index 098a9c3cf0de..85ac495b2d7f 100644 --- a/src/yb/tserver/xcluster_ddl_queue_handler.cc +++ b/src/yb/tserver/xcluster_ddl_queue_handler.cc @@ -49,9 +49,6 @@ DEFINE_test_flag(bool, xcluster_ddl_queue_handler_fail_at_end, false, #define HAS_MEMBER_OF_TYPE(doc, member_name, is_type) \ (doc.HasMember(member_name) && doc[member_name].is_type()) -#define LOG_QUERY(query) \ - LOG_IF(INFO, FLAGS_TEST_xcluster_ddl_queue_handler_log_queries) << "Running query: " << query; - namespace yb::tserver { namespace { @@ -103,11 +100,12 @@ Result ParseSerializedJson(const std::string& raw_json_data XClusterDDLQueueHandler::XClusterDDLQueueHandler( client::YBClient* local_client, const NamespaceName& namespace_name, - const NamespaceId& namespace_id, TserverXClusterContextIf& xcluster_context, - ConnectToPostgresFunc connect_to_pg_func) + const NamespaceId& namespace_id, const std::string& log_prefix, + TserverXClusterContextIf& xcluster_context, ConnectToPostgresFunc connect_to_pg_func) : local_client_(local_client), namespace_name_(namespace_name), namespace_id_(namespace_id), + log_prefix_(log_prefix), xcluster_context_(xcluster_context), connect_to_pg_func_(std::move(connect_to_pg_func)) {} @@ -129,7 +127,7 @@ Status XClusterDDLQueueHandler::ProcessDDLQueueTable(const XClusterOutputClientR // We don't expect to get an invalid safe time, but it is possible in edge cases (see #21528). // Log an error and return for now, wait until a valid safe time does come in so we can continue. if (target_safe_ht.is_special()) { - LOG(WARNING) << "Received invalid safe time " << target_safe_ht; + LOG_WITH_PREFIX(WARNING) << "Received invalid safe time " << target_safe_ht; return Status::OK(); } @@ -160,7 +158,7 @@ Status XClusterDDLQueueHandler::ProcessDDLQueueTable(const XClusterOutputClientR continue; } - VLOG(1) << "ProcessDDLQueueTable: Processing entry " << query_info.ToString(); + VLOG_WITH_PREFIX(1) << "ProcessDDLQueueTable: Processing entry " << query_info.ToString(); SCHECK( kSupportedCommandTags.contains(query_info.command_tag), InvalidArgument, @@ -176,6 +174,9 @@ Status XClusterDDLQueueHandler::ProcessDDLQueueTable(const XClusterOutputClientR RETURN_NOT_OK(ProcessNewRelations(doc, query_info.schema, new_relations)); RETURN_NOT_OK(ProcessDDLQuery(query_info)); + + VLOG_WITH_PREFIX(2) << "ProcessDDLQueueTable: Successfully processed entry " + << query_info.ToString(); } if (FLAGS_TEST_xcluster_ddl_queue_handler_fail_at_end) { @@ -269,7 +270,7 @@ Status XClusterDDLQueueHandler::ProcessManualExecutionQuery(const DDLQueryInfo& } Status XClusterDDLQueueHandler::RunAndLogQuery(const std::string& query) { - LOG_IF(INFO, FLAGS_TEST_xcluster_ddl_queue_handler_log_queries) + LOG_IF_WITH_PREFIX(INFO, FLAGS_TEST_xcluster_ddl_queue_handler_log_queries) << "XClusterDDLQueueHandler: Running query: " << query; return pg_conn_->Execute(query); } diff --git a/src/yb/tserver/xcluster_ddl_queue_handler.h b/src/yb/tserver/xcluster_ddl_queue_handler.h index 0a4eac5869e4..ec3e33f3048d 100644 --- a/src/yb/tserver/xcluster_ddl_queue_handler.h +++ b/src/yb/tserver/xcluster_ddl_queue_handler.h @@ -34,8 +34,8 @@ class XClusterDDLQueueHandler { public: XClusterDDLQueueHandler( client::YBClient* local_client, const NamespaceName& namespace_name, - const NamespaceId& namespace_id, TserverXClusterContextIf& xcluster_context, - ConnectToPostgresFunc connect_to_pg_func); + const NamespaceId& namespace_id, const std::string& log_prefix, + TserverXClusterContextIf& xcluster_context, ConnectToPostgresFunc connect_to_pg_func); virtual ~XClusterDDLQueueHandler(); Status ProcessDDLQueueTable(const XClusterOutputClientResponse& response); @@ -77,11 +77,14 @@ class XClusterDDLQueueHandler { rapidjson::Document& doc, const std::string& schema, std::vector& new_relations); + const std::string& LogPrefix() const { return log_prefix_; } + client::YBClient* local_client_; std::unique_ptr pg_conn_; NamespaceName namespace_name_; NamespaceId namespace_id_; + const std::string log_prefix_; TserverXClusterContextIf& xcluster_context_; ConnectToPostgresFunc connect_to_pg_func_; diff --git a/src/yb/tserver/xcluster_output_client.cc b/src/yb/tserver/xcluster_output_client.cc index 25e85e1ceca9..adeeec6d5f25 100644 --- a/src/yb/tserver/xcluster_output_client.cc +++ b/src/yb/tserver/xcluster_output_client.cc @@ -132,8 +132,8 @@ void XClusterOutputClient::SetLastCompatibleConsumerSchemaVersion(SchemaVersion std::lock_guard lock(lock_); if (schema_version != cdc::kInvalidSchemaVersion && schema_version > last_compatible_consumer_schema_version_) { - LOG(INFO) << "Last compatible consumer schema version updated to " - << schema_version; + LOG_WITH_PREFIX(INFO) << "Last compatible consumer schema version updated to " + << schema_version; last_compatible_consumer_schema_version_ = schema_version; } } @@ -263,7 +263,7 @@ Status XClusterOutputClient::SendUserTableWrites() { write_request = write_strategy_->FetchNextRequest(); } if (!write_request) { - LOG(WARNING) << "Expected to find a write_request but were unable to"; + LOG_WITH_PREFIX(WARNING) << "Expected to find a write_request but were unable to"; return STATUS(IllegalState, "Could not find a write request to send"); } SendNextCDCWriteToTablet(std::move(write_request)); @@ -366,15 +366,17 @@ Result XClusterOutputClient::ProcessChangeMetadataOp(const cdc::CDCRecordP if (record.change_metadata_request().has_remove_table_id() || !record.change_metadata_request().add_multiple_tables().empty()) { // TODO (#16557): Support remove_table_id() for colocated tables / tablegroups. - LOG(INFO) << "Ignoring change metadata request to add multiple/remove tables to tablet : " - << producer_tablet_info_.tablet_id; + LOG_WITH_PREFIX(INFO) + << "Ignoring change metadata request to add multiple/remove tables to tablet : " + << producer_tablet_info_.tablet_id; return true; } if (!record.change_metadata_request().has_schema() && !record.change_metadata_request().has_add_table()) { - LOG(INFO) << "Ignoring change metadata request for tablet : " << producer_tablet_info_.tablet_id - << " as it does not contain any schema. "; + LOG_WITH_PREFIX(INFO) << "Ignoring change metadata request for tablet : " + << producer_tablet_info_.tablet_id + << " as it does not contain any schema. "; return true; } @@ -396,7 +398,7 @@ Result XClusterOutputClient::ProcessChangeMetadataOp(const cdc::CDCRecordP if (cached_schema_versions && cached_schema_versions->contains(record.change_metadata_request().schema_version())) { - LOG(INFO) << Format( + LOG_WITH_PREFIX(INFO) << Format( "Ignoring change metadata request with schema $0 for tablet $1 as mapping from" "producer-consumer schema version already exists", schema.DebugString(), producer_tablet_info_.tablet_id); @@ -538,7 +540,7 @@ void XClusterOutputClient::DoSchemaVersionCheckDone( auto msg = Format( "XCluster schema mismatch. No matching schema for producer schema $0 with version $1", req.schema().DebugString(), producer_schema_version); - LOG(WARNING) << msg << ": " << status; + LOG_WITH_PREFIX(WARNING) << msg << ": " << status; if (resp.error().code() == TabletServerErrorPB::MISMATCHED_SCHEMA) { ACQUIRE_MUTEX_IF_ONLINE_ELSE_RETURN; xcluster_poller_->StoreReplicationError(replication_error); @@ -651,11 +653,11 @@ void XClusterOutputClient::DoWriteCDCRecordDone( void XClusterOutputClient::HandleError(const Status& s) { if (s.IsTryAgain()) { - LOG(WARNING) << "Retrying applying replicated record for consumer tablet: " - << consumer_tablet_info_.tablet_id << ", reason: " << s; + LOG_WITH_PREFIX(WARNING) << "Retrying applying replicated record for consumer tablet: " + << consumer_tablet_info_.tablet_id << ", reason: " << s; } else { - LOG(ERROR) << "Error while applying replicated record: " << s - << ", consumer tablet: " << consumer_tablet_info_.tablet_id; + LOG_WITH_PREFIX(ERROR) << "Error while applying replicated record: " << s + << ", consumer tablet: " << consumer_tablet_info_.tablet_id; } { ACQUIRE_MUTEX_IF_ONLINE_ELSE_RETURN; diff --git a/src/yb/tserver/xcluster_poller.cc b/src/yb/tserver/xcluster_poller.cc index a2f6d021c418..9e351147a670 100644 --- a/src/yb/tserver/xcluster_poller.cc +++ b/src/yb/tserver/xcluster_poller.cc @@ -136,7 +136,7 @@ XClusterPoller::XClusterPoller( } XClusterPoller::~XClusterPoller() { - VLOG(1) << "Destroying XClusterPoller"; + VLOG_WITH_PREFIX(1) << "Destroying XClusterPoller"; DCHECK(shutdown_); } @@ -154,7 +154,7 @@ void XClusterPoller::InitDDLQueuePoller( Init(use_local_tserver, rate_limiter); ddl_queue_handler_ = std::make_shared( - &local_client_, namespace_name, consumer_namespace_id_, xcluster_context, + &local_client_, namespace_name, consumer_namespace_id_, LogPrefix(), xcluster_context, std::move(connect_to_pg_func)); } @@ -261,9 +261,10 @@ void XClusterPoller::DoSetSchemaVersion( // Re-enable polling. last_task_schedule_time_ is already current as it was set by the caller // function ScheduleSetSchemaVersionIfNeeded. if (!is_polling_.exchange(true)) { - LOG(INFO) << "Restarting polling on " << producer_tablet_info_.tablet_id - << " Producer schema version : " << validated_schema_version_ - << " Consumer schema version : " << last_compatible_consumer_schema_version_; + LOG_WITH_PREFIX(INFO) << "Restarting polling on " << producer_tablet_info_.tablet_id + << " Producer schema version : " << validated_schema_version_ + << " Consumer schema version : " + << last_compatible_consumer_schema_version_; ScheduleFunc(BIND_FUNCTION_AND_ARGS(XClusterPoller::DoPoll)); } } @@ -277,7 +278,7 @@ HybridTime XClusterPoller::GetSafeTime() const { void XClusterPoller::UpdateSafeTime(int64 new_time) { HybridTime new_hybrid_time(new_time); if (new_hybrid_time.is_special()) { - LOG(WARNING) << "Received invalid xCluster safe time: " << new_hybrid_time; + LOG_WITH_PREFIX(WARNING) << "Received invalid xCluster safe time: " << new_hybrid_time; return; } @@ -522,9 +523,11 @@ void XClusterPoller::HandleApplyChangesResponse(XClusterOutputClientResponse res if (s.IsTryAgain()) { // The handler will return try again when waiting for safe time to catch up, so can log // these errors less frequently. - YB_LOG_EVERY_N(WARNING, 300) << "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG; + YB_LOG_WITH_PREFIX_EVERY_N(WARNING, 300) + << "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG; } else { - YB_LOG_EVERY_N(WARNING, 30) << "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG; + YB_LOG_WITH_PREFIX_EVERY_N(WARNING, 30) + << "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG; } StoreNOKReplicationError(); if (FLAGS_enable_xcluster_stat_collection) { @@ -563,8 +566,10 @@ void XClusterPoller::HandleApplyChangesResponse(XClusterOutputClientResponse res idle_polls_ = (response.processed_record_count == 0) ? idle_polls_ + 1 : 0; if (validated_schema_version_ < response.wait_for_version) { - LOG(WARNING) << "Pausing Poller since producer schema version " << response.wait_for_version - << " is higher than consumer schema version " << validated_schema_version_; + LOG_WITH_PREFIX(WARNING) << "Pausing Poller since producer schema version " + << response.wait_for_version + << " is higher than consumer schema version " + << validated_schema_version_; is_polling_ = false; validated_schema_version_ = response.wait_for_version - 1; return;