Skip to content

Commit

Permalink
[#23702] xClusterDDLRepl: Add extra logging
Browse files Browse the repository at this point in the history
Summary:
Adding extra logging to help with debugging, and updating existing logs to use prefixes when we can.
Jira: DB-12611

Test Plan: Jenkins

Reviewers: hsunder, xCluster

Reviewed By: hsunder

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D37628
  • Loading branch information
hulien22 committed Aug 29, 2024
1 parent 2cf648b commit b14851d
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 36 deletions.
6 changes: 6 additions & 0 deletions src/yb/master/xcluster/add_table_to_xcluster_source_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ Status AddTableToXClusterSourceTask::MarkTableAsCheckpointed() {
RETURN_NOT_OK(outbound_replication_group_->MarkNewTablesAsCheckpointed(
table_info_->namespace_id(), table_info_->id(), epoch_));

const auto stream_id = VERIFY_RESULT(
outbound_replication_group_->GetStreamId(table_info_->namespace_id(), table_info_->id()));
LOG_WITH_PREFIX(INFO)
<< "Table " << table_info_->ToString()
<< " successfully checkpointed for xCluster universe replication with Stream: " << stream_id;

Complete();
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions src/yb/master/xcluster/add_table_to_xcluster_target_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ Status AddTableToXClusterTargetTask::WaitForXClusterSafeTimeCaughtUp() {
return Status::OK();
}

LOG(INFO) << "Table " << table_info_->ToString()
<< " successfully added to xCluster universe replication";
LOG_WITH_PREFIX(INFO) << "Table " << table_info_->ToString()
<< " successfully added to xCluster universe replication";

RETURN_NOT_OK(CleanupAndComplete());
return Status::OK();
Expand Down
13 changes: 13 additions & 0 deletions src/yb/master/xcluster/xcluster_outbound_replication_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1132,4 +1132,17 @@ Result<std::vector<NamespaceId>> XClusterOutboundReplicationGroup::GetNamespaces
return namespace_ids;
}

Result<std::string> XClusterOutboundReplicationGroup::GetStreamId(
const NamespaceId& namespace_id, const TableId& table_id) const {
SharedLock mutex_lock(mutex_);
auto l = VERIFY_RESULT(LockForRead());

auto* ns_info = VERIFY_RESULT(GetNamespaceInfo(namespace_id));
auto* table_info = FindOrNull(ns_info->table_infos(), table_id);

SCHECK(table_info, NotFound, "Table $0 not found in $1", table_id, namespace_id);

return table_info->stream_id();
}

} // namespace yb::master
3 changes: 3 additions & 0 deletions src/yb/master/xcluster/xcluster_outbound_replication_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class XClusterOutboundReplicationGroup

Result<std::vector<NamespaceId>> GetNamespaces() const EXCLUDES(mutex_);

Result<std::string> GetStreamId(const NamespaceId& namespace_id, const TableId& table_id) const
EXCLUDES(mutex_);

private:
friend class XClusterOutboundReplicationGroupMocked;
friend class AddTableToXClusterSourceTask;
Expand Down
2 changes: 1 addition & 1 deletion src/yb/tserver/xcluster_ddl_queue_handler-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class XClusterDDLQueueHandlerMocked : public XClusterDDLQueueHandler {
const client::YBTableName& table_name, MockTserverXClusterContext& xcluster_context)
: XClusterDDLQueueHandler(
/* local_client */ nullptr, table_name.namespace_name(), table_name.namespace_id(),
xcluster_context, /* connect_to_pg_func */ nullptr) {}
/* log_prefix */ "", xcluster_context, /* connect_to_pg_func */ nullptr) {}

Status ProcessDDLQueueTable(
const std::optional<HybridTime>& apply_safe_time, int num_records = 1) {
Expand Down
17 changes: 9 additions & 8 deletions src/yb/tserver/xcluster_ddl_queue_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ DEFINE_test_flag(bool, xcluster_ddl_queue_handler_fail_at_end, false,
#define HAS_MEMBER_OF_TYPE(doc, member_name, is_type) \
(doc.HasMember(member_name) && doc[member_name].is_type())

#define LOG_QUERY(query) \
LOG_IF(INFO, FLAGS_TEST_xcluster_ddl_queue_handler_log_queries) << "Running query: " << query;

namespace yb::tserver {

namespace {
Expand Down Expand Up @@ -103,11 +100,12 @@ Result<rapidjson::Document> ParseSerializedJson(const std::string& raw_json_data

XClusterDDLQueueHandler::XClusterDDLQueueHandler(
client::YBClient* local_client, const NamespaceName& namespace_name,
const NamespaceId& namespace_id, TserverXClusterContextIf& xcluster_context,
ConnectToPostgresFunc connect_to_pg_func)
const NamespaceId& namespace_id, const std::string& log_prefix,
TserverXClusterContextIf& xcluster_context, ConnectToPostgresFunc connect_to_pg_func)
: local_client_(local_client),
namespace_name_(namespace_name),
namespace_id_(namespace_id),
log_prefix_(log_prefix),
xcluster_context_(xcluster_context),
connect_to_pg_func_(std::move(connect_to_pg_func)) {}

Expand All @@ -129,7 +127,7 @@ Status XClusterDDLQueueHandler::ProcessDDLQueueTable(const XClusterOutputClientR
// We don't expect to get an invalid safe time, but it is possible in edge cases (see #21528).
// Log an error and return for now, wait until a valid safe time does come in so we can continue.
if (target_safe_ht.is_special()) {
LOG(WARNING) << "Received invalid safe time " << target_safe_ht;
LOG_WITH_PREFIX(WARNING) << "Received invalid safe time " << target_safe_ht;
return Status::OK();
}

Expand Down Expand Up @@ -160,7 +158,7 @@ Status XClusterDDLQueueHandler::ProcessDDLQueueTable(const XClusterOutputClientR
continue;
}

VLOG(1) << "ProcessDDLQueueTable: Processing entry " << query_info.ToString();
VLOG_WITH_PREFIX(1) << "ProcessDDLQueueTable: Processing entry " << query_info.ToString();

SCHECK(
kSupportedCommandTags.contains(query_info.command_tag), InvalidArgument,
Expand All @@ -176,6 +174,9 @@ Status XClusterDDLQueueHandler::ProcessDDLQueueTable(const XClusterOutputClientR

RETURN_NOT_OK(ProcessNewRelations(doc, query_info.schema, new_relations));
RETURN_NOT_OK(ProcessDDLQuery(query_info));

VLOG_WITH_PREFIX(2) << "ProcessDDLQueueTable: Successfully processed entry "
<< query_info.ToString();
}

if (FLAGS_TEST_xcluster_ddl_queue_handler_fail_at_end) {
Expand Down Expand Up @@ -269,7 +270,7 @@ Status XClusterDDLQueueHandler::ProcessManualExecutionQuery(const DDLQueryInfo&
}

Status XClusterDDLQueueHandler::RunAndLogQuery(const std::string& query) {
LOG_IF(INFO, FLAGS_TEST_xcluster_ddl_queue_handler_log_queries)
LOG_IF_WITH_PREFIX(INFO, FLAGS_TEST_xcluster_ddl_queue_handler_log_queries)
<< "XClusterDDLQueueHandler: Running query: " << query;
return pg_conn_->Execute(query);
}
Expand Down
7 changes: 5 additions & 2 deletions src/yb/tserver/xcluster_ddl_queue_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class XClusterDDLQueueHandler {
public:
XClusterDDLQueueHandler(
client::YBClient* local_client, const NamespaceName& namespace_name,
const NamespaceId& namespace_id, TserverXClusterContextIf& xcluster_context,
ConnectToPostgresFunc connect_to_pg_func);
const NamespaceId& namespace_id, const std::string& log_prefix,
TserverXClusterContextIf& xcluster_context, ConnectToPostgresFunc connect_to_pg_func);
virtual ~XClusterDDLQueueHandler();

Status ProcessDDLQueueTable(const XClusterOutputClientResponse& response);
Expand Down Expand Up @@ -77,11 +77,14 @@ class XClusterDDLQueueHandler {
rapidjson::Document& doc, const std::string& schema,
std::vector<YsqlFullTableName>& new_relations);

const std::string& LogPrefix() const { return log_prefix_; }

client::YBClient* local_client_;

std::unique_ptr<pgwrapper::PGConn> pg_conn_;
NamespaceName namespace_name_;
NamespaceId namespace_id_;
const std::string log_prefix_;
TserverXClusterContextIf& xcluster_context_;
ConnectToPostgresFunc connect_to_pg_func_;

Expand Down
28 changes: 15 additions & 13 deletions src/yb/tserver/xcluster_output_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ void XClusterOutputClient::SetLastCompatibleConsumerSchemaVersion(SchemaVersion
std::lock_guard lock(lock_);
if (schema_version != cdc::kInvalidSchemaVersion &&
schema_version > last_compatible_consumer_schema_version_) {
LOG(INFO) << "Last compatible consumer schema version updated to "
<< schema_version;
LOG_WITH_PREFIX(INFO) << "Last compatible consumer schema version updated to "
<< schema_version;
last_compatible_consumer_schema_version_ = schema_version;
}
}
Expand Down Expand Up @@ -263,7 +263,7 @@ Status XClusterOutputClient::SendUserTableWrites() {
write_request = write_strategy_->FetchNextRequest();
}
if (!write_request) {
LOG(WARNING) << "Expected to find a write_request but were unable to";
LOG_WITH_PREFIX(WARNING) << "Expected to find a write_request but were unable to";
return STATUS(IllegalState, "Could not find a write request to send");
}
SendNextCDCWriteToTablet(std::move(write_request));
Expand Down Expand Up @@ -366,15 +366,17 @@ Result<bool> XClusterOutputClient::ProcessChangeMetadataOp(const cdc::CDCRecordP
if (record.change_metadata_request().has_remove_table_id() ||
!record.change_metadata_request().add_multiple_tables().empty()) {
// TODO (#16557): Support remove_table_id() for colocated tables / tablegroups.
LOG(INFO) << "Ignoring change metadata request to add multiple/remove tables to tablet : "
<< producer_tablet_info_.tablet_id;
LOG_WITH_PREFIX(INFO)
<< "Ignoring change metadata request to add multiple/remove tables to tablet : "
<< producer_tablet_info_.tablet_id;
return true;
}

if (!record.change_metadata_request().has_schema() &&
!record.change_metadata_request().has_add_table()) {
LOG(INFO) << "Ignoring change metadata request for tablet : " << producer_tablet_info_.tablet_id
<< " as it does not contain any schema. ";
LOG_WITH_PREFIX(INFO) << "Ignoring change metadata request for tablet : "
<< producer_tablet_info_.tablet_id
<< " as it does not contain any schema. ";
return true;
}

Expand All @@ -396,7 +398,7 @@ Result<bool> XClusterOutputClient::ProcessChangeMetadataOp(const cdc::CDCRecordP

if (cached_schema_versions &&
cached_schema_versions->contains(record.change_metadata_request().schema_version())) {
LOG(INFO) << Format(
LOG_WITH_PREFIX(INFO) << Format(
"Ignoring change metadata request with schema $0 for tablet $1 as mapping from"
"producer-consumer schema version already exists",
schema.DebugString(), producer_tablet_info_.tablet_id);
Expand Down Expand Up @@ -538,7 +540,7 @@ void XClusterOutputClient::DoSchemaVersionCheckDone(
auto msg = Format(
"XCluster schema mismatch. No matching schema for producer schema $0 with version $1",
req.schema().DebugString(), producer_schema_version);
LOG(WARNING) << msg << ": " << status;
LOG_WITH_PREFIX(WARNING) << msg << ": " << status;
if (resp.error().code() == TabletServerErrorPB::MISMATCHED_SCHEMA) {
ACQUIRE_MUTEX_IF_ONLINE_ELSE_RETURN;
xcluster_poller_->StoreReplicationError(replication_error);
Expand Down Expand Up @@ -651,11 +653,11 @@ void XClusterOutputClient::DoWriteCDCRecordDone(

void XClusterOutputClient::HandleError(const Status& s) {
if (s.IsTryAgain()) {
LOG(WARNING) << "Retrying applying replicated record for consumer tablet: "
<< consumer_tablet_info_.tablet_id << ", reason: " << s;
LOG_WITH_PREFIX(WARNING) << "Retrying applying replicated record for consumer tablet: "
<< consumer_tablet_info_.tablet_id << ", reason: " << s;
} else {
LOG(ERROR) << "Error while applying replicated record: " << s
<< ", consumer tablet: " << consumer_tablet_info_.tablet_id;
LOG_WITH_PREFIX(ERROR) << "Error while applying replicated record: " << s
<< ", consumer tablet: " << consumer_tablet_info_.tablet_id;
}
{
ACQUIRE_MUTEX_IF_ONLINE_ELSE_RETURN;
Expand Down
25 changes: 15 additions & 10 deletions src/yb/tserver/xcluster_poller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ XClusterPoller::XClusterPoller(
}

XClusterPoller::~XClusterPoller() {
VLOG(1) << "Destroying XClusterPoller";
VLOG_WITH_PREFIX(1) << "Destroying XClusterPoller";
DCHECK(shutdown_);
}

Expand All @@ -154,7 +154,7 @@ void XClusterPoller::InitDDLQueuePoller(
Init(use_local_tserver, rate_limiter);

ddl_queue_handler_ = std::make_shared<XClusterDDLQueueHandler>(
&local_client_, namespace_name, consumer_namespace_id_, xcluster_context,
&local_client_, namespace_name, consumer_namespace_id_, LogPrefix(), xcluster_context,
std::move(connect_to_pg_func));
}

Expand Down Expand Up @@ -261,9 +261,10 @@ void XClusterPoller::DoSetSchemaVersion(
// Re-enable polling. last_task_schedule_time_ is already current as it was set by the caller
// function ScheduleSetSchemaVersionIfNeeded.
if (!is_polling_.exchange(true)) {
LOG(INFO) << "Restarting polling on " << producer_tablet_info_.tablet_id
<< " Producer schema version : " << validated_schema_version_
<< " Consumer schema version : " << last_compatible_consumer_schema_version_;
LOG_WITH_PREFIX(INFO) << "Restarting polling on " << producer_tablet_info_.tablet_id
<< " Producer schema version : " << validated_schema_version_
<< " Consumer schema version : "
<< last_compatible_consumer_schema_version_;
ScheduleFunc(BIND_FUNCTION_AND_ARGS(XClusterPoller::DoPoll));
}
}
Expand All @@ -277,7 +278,7 @@ HybridTime XClusterPoller::GetSafeTime() const {
void XClusterPoller::UpdateSafeTime(int64 new_time) {
HybridTime new_hybrid_time(new_time);
if (new_hybrid_time.is_special()) {
LOG(WARNING) << "Received invalid xCluster safe time: " << new_hybrid_time;
LOG_WITH_PREFIX(WARNING) << "Received invalid xCluster safe time: " << new_hybrid_time;
return;
}

Expand Down Expand Up @@ -522,9 +523,11 @@ void XClusterPoller::HandleApplyChangesResponse(XClusterOutputClientResponse res
if (s.IsTryAgain()) {
// The handler will return try again when waiting for safe time to catch up, so can log
// these errors less frequently.
YB_LOG_EVERY_N(WARNING, 300) << "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG;
YB_LOG_WITH_PREFIX_EVERY_N(WARNING, 300)
<< "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG;
} else {
YB_LOG_EVERY_N(WARNING, 30) << "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG;
YB_LOG_WITH_PREFIX_EVERY_N(WARNING, 30)
<< "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG;
}
StoreNOKReplicationError();
if (FLAGS_enable_xcluster_stat_collection) {
Expand Down Expand Up @@ -563,8 +566,10 @@ void XClusterPoller::HandleApplyChangesResponse(XClusterOutputClientResponse res
idle_polls_ = (response.processed_record_count == 0) ? idle_polls_ + 1 : 0;

if (validated_schema_version_ < response.wait_for_version) {
LOG(WARNING) << "Pausing Poller since producer schema version " << response.wait_for_version
<< " is higher than consumer schema version " << validated_schema_version_;
LOG_WITH_PREFIX(WARNING) << "Pausing Poller since producer schema version "
<< response.wait_for_version
<< " is higher than consumer schema version "
<< validated_schema_version_;
is_polling_ = false;
validated_schema_version_ = response.wait_for_version - 1;
return;
Expand Down

0 comments on commit b14851d

Please sign in to comment.