From 17c0fa638ac7fc1da8cbac9905663ca9deb0630c Mon Sep 17 00:00:00 2001 From: Amitanand Aiyer Date: Mon, 26 Jul 2021 23:58:09 -0700 Subject: [PATCH] [#7889, #5326] YBASE: Implement chunking/throttling in Tablet::BackfillIndexForYSQL Summary: Depends on https://phabricator.dev.yugabyte.com/D12435 Implement chunking/throttling in Tablet::BackfillIndexForYSQL * At a high level, we loop around in Tablet::BackfillIndexForYSQL to call `BACKFILL INDEX` on postgres until the backfill is done, or we are approaching the deadline. * The loop also throttles/sleeps itself to maintain the specified rate of backfill. Notes regarding rolling-upgrades: * The diff assumes that the TServer issuing the new format BACKFILL INDEX command only issues it to a postgres process, which understands the new format. This holds true, because the postgres process that the TServer talks to is the local postgres process. * Postgres/yb-client will read from the "leader" of the tablet. If the leadership has moved, this may be a different tablet/TServer. In this case, it may read/write from/to a TServer with a different version. In this case, the backfill_spec field is ignored by the server running the older process. This will result in the "old" behavior where the BACKFILL INDEX command will run until the end of the tablet range. Perf notes: Connecting to postgres to run a BACKFILL INDEX command may have some fixed set up costs. So having a very small batch size could be inefficient. More detailed description of the changes: ------------ Docdb: # Implement support for backfill_spec. # Set it accordingly while calling BACKFILL INDEX # Parse/update backfill_spec accordingly while reading pgsql_operation for a backfill read. # Loop around BACKFILL INDEX to implement throttling/progress-in-batches Test Plan: Tests added. ybd --cxx-test pg_index_backfill-test ybd --cxx-test integration-tests_cassandra_cpp_driver-test Reviewers: mihnea, neil, jason Reviewed By: jason Subscribers: bogdan, yql, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D12425 --- src/yb/docdb/pgsql_operation.cc | 10 + src/yb/tablet/tablet.cc | 319 +++++++++++++----- src/yb/tablet/tablet.h | 19 +- src/yb/tserver/tablet_service.cc | 3 +- .../yql/pgwrapper/pg_index_backfill-test.cc | 180 +++++++++- 5 files changed, 415 insertions(+), 116 deletions(-) diff --git a/src/yb/docdb/pgsql_operation.cc b/src/yb/docdb/pgsql_operation.cc index 7aac0b042bba..6ba2d64af158 100644 --- a/src/yb/docdb/pgsql_operation.cc +++ b/src/yb/docdb/pgsql_operation.cc @@ -137,6 +137,7 @@ Result CreateIterator( CoarseTimePoint deadline, const ReadHybridTime& read_time, bool is_explicit_request_read_time) { + VLOG_IF(2, request.is_for_backfill()) << "Creating iterator for " << yb::ToString(request); common::YQLRowwiseIteratorIf::UniPtr result; // TODO(neil) Remove the following IF block when it is completely obsolete. @@ -167,6 +168,15 @@ Result CreateIterator( actual_read_time.read = start_sub_doc_key.hybrid_time(); } } + } else if (request.is_for_backfill()) { + RSTATUS_DCHECK(is_explicit_request_read_time, InvalidArgument, + "Backfill request should already be using explicit read times."); + PgsqlBackfillSpecPB spec; + spec.ParseFromString(a2b_hex(request.backfill_spec())); + if (!spec.next_row_key().empty()) { + KeyBytes start_key_bytes(spec.next_row_key()); + RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice())); + } } RETURN_NOT_OK(ql_storage.GetIterator( request, projection, schema, txn_op_context, diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 1e388f7a6967..a653129059cc 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -1824,6 +1824,40 @@ Result Tablet::HasScanReachedMaxPartitionKey( return false; } +namespace { + +void SetBackfillSpecForYsqlBackfill( + const PgsqlReadRequestPB& pgsql_read_request, + const size_t& row_count, + PgsqlResponsePB* response) { + PgsqlBackfillSpecPB in_spec; + in_spec.ParseFromString(a2b_hex(pgsql_read_request.backfill_spec())); + + auto limit = in_spec.limit(); + PgsqlBackfillSpecPB out_spec; + out_spec.set_limit(limit); + out_spec.set_count(in_spec.count() + row_count); + response->set_is_backfill_batch_done(!response->has_paging_state()); + VLOG(2) << " limit is " << limit << " set_count to " << out_spec.count(); + if (limit >= 0 && out_spec.count() >= limit) { + // Hint postgres to stop scanning now. And set up the + // next_row_key based on the paging state. + if (response->has_paging_state()) { + out_spec.set_next_row_key(response->paging_state().next_row_key()); + } + response->set_is_backfill_batch_done(true); + } + + VLOG(2) << "Got input spec " << yb::ToString(in_spec) + << " set output spec " << yb::ToString(out_spec) + << " batch_done=" << response->is_backfill_batch_done(); + string serialized_pb; + out_spec.SerializeToString(&serialized_pb); + response->set_backfill_spec(b2a_hex(serialized_pb)); +} + +} // namespace + CHECKED_STATUS Tablet::CreatePagingStateForRead(const PgsqlReadRequestPB& pgsql_read_request, const size_t row_count, PgsqlResponsePB* response) const { @@ -1861,6 +1895,12 @@ CHECKED_STATUS Tablet::CreatePagingStateForRead(const PgsqlReadRequestPB& pgsql_ response->mutable_paging_state()->set_total_num_rows_read( pgsql_read_request.paging_state().total_num_rows_read() + row_count); } + + if (pgsql_read_request.is_for_backfill()) { + // BackfillSpec is used to implement "paging" across multiple BackfillIndex + // rpcs from the master. + SetBackfillSpecForYsqlBackfill(pgsql_read_request, row_count, response); + } return Status::OK(); } @@ -2301,30 +2341,12 @@ Status Tablet::AlterWalRetentionSecs(ChangeMetadataOperation* operation) { operation->ToString()); } -// Assume that we are already in the Backfilling mode. -Status Tablet::BackfillIndexesForYsql( - const std::vector& indexes, - const std::string& backfill_from, - const CoarseTimePoint deadline, - const HybridTime read_time, +namespace { + +Result ConnectToPostgres( const HostPort& pgsql_proxy_bind_address, const std::string& database_name, - const uint64_t postgres_auth_key, - std::string* backfilled_until) { - if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) { - TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms); - SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms)); - } - LOG(INFO) << "Begin " << __func__ - << " at " << read_time - << " for " << AsString(indexes); - - if (!backfill_from.empty()) { - return STATUS( - InvalidArgument, - "YSQL index backfill does not support backfill_from, yet"); - } - + const uint64_t postgres_auth_key) { // Construct connection string. Note that the plain password in the connection string will be // sent over the wire, but since it only goes over a unix-domain socket, there should be no // eavesdropping/tampering issues. @@ -2337,34 +2359,6 @@ Status Tablet::BackfillIndexesForYsql( pgwrapper::PqEscapeLiteral(database_name)); VLOG(1) << __func__ << ": libpq connection string: " << conn_str; - // Construct query string. - std::string index_oids; - { - std::stringstream ss; - for (auto& index : indexes) { - Oid index_oid = VERIFY_RESULT(GetPgsqlTableOid(index.table_id())); - ss << index_oid << ","; - } - index_oids = ss.str(); - index_oids.pop_back(); - } - std::string partition_key = metadata_->partition()->partition_key_start(); - // This should be safe from injection attacks because the parameters only consist of characters - // [,0-9a-f]. - // TODO(jason): pass deadline - PgsqlBackfillSpecPB bfinstr_pb; - std::string bfinstr; - bfinstr_pb.set_limit(8192); - bfinstr_pb.SerializeToString(&bfinstr); - - std::string query_str = Format( - "BACKFILL INDEX $0 WITH x'$1' READ TIME $2 PARTITION x'$3';", - index_oids, - b2a_hex(bfinstr), - read_time.ToUint64(), - b2a_hex(partition_key)); - VLOG(1) << __func__ << ": libpq query string: " << query_str; - // Connect. pgwrapper::PGConnPtr conn(PQconnectdb(conn_str.c_str())); if (!conn) { @@ -2377,11 +2371,30 @@ Status Tablet::BackfillIndexesForYsql( if (msg.back() == '\n') { msg.resize(msg.size() - 1); } - LOG(WARNING) << "libpq connection \"" << conn_str - << "\" failed: " << msg; + LOG(WARNING) << "libpq connection \"" << conn_str << "\" failed: " << msg; return STATUS_FORMAT(IllegalState, "backfill connection to DB failed: $0", msg); } + return conn; +} +string GenerateSerializedBackfillSpec(size_t batch_size, const string& next_row_to_backfill) { + PgsqlBackfillSpecPB backfill_spec; + std::string serialized_backfill_spec; + // Note that although we set the desired batch_size as the limit, postgres + // has its own internal paging size of 1024 (controlled by --ysql_prefetch_limit). So the actual + // rows processed could be larger than the limit set here; unless it happens + // to be a multiple of FLAGS_ysql_prefetch_limit + backfill_spec.set_limit(batch_size); + backfill_spec.set_next_row_key(next_row_to_backfill); + backfill_spec.SerializeToString(&serialized_backfill_spec); + VLOG(2) << "Generating backfill_spec " << yb::ToString(backfill_spec) << " encoded as " + << b2a_hex(serialized_backfill_spec) << " a string of length " + << serialized_backfill_spec.length(); + return serialized_backfill_spec; +} + +Result QueryPostgresToDoBackfill( + const pgwrapper::PGConnPtr& conn, const string& query_str) { // Execute. pgwrapper::PGResultPtr res(PQexec(conn.get(), query_str.c_str())); if (!res) { @@ -2391,10 +2404,10 @@ Status Tablet::BackfillIndexesForYsql( if (msg.back() == '\n') { msg.resize(msg.size() - 1); } - LOG(WARNING) << "libpq query \"" << query_str - << "\" was not sent: " << msg; + LOG(WARNING) << "libpq query \"" << query_str << "\" was not sent: " << msg; return STATUS_FORMAT(IllegalState, "backfill query couldn't be sent: $0", msg); } + ExecStatusType status = PQresultStatus(res.get()); // TODO(jason): more properly handle bad statuses if (status != PGRES_TUPLES_OK) { @@ -2404,9 +2417,8 @@ Status Tablet::BackfillIndexesForYsql( if (msg.back() == '\n') { msg.resize(msg.size() - 1); } - LOG(WARNING) << "libpq query \"" << query_str - << "\" returned " << PQresStatus(status) - << ": " << msg; + LOG(WARNING) << "libpq query \"" << query_str << "\" returned " << PQresStatus(status) << ": " + << msg; return STATUS(IllegalState, msg); } @@ -2414,8 +2426,147 @@ Status Tablet::BackfillIndexesForYsql( CHECK_EQ(PQnfields(res.get()), 1); const std::string returned_spec = CHECK_RESULT(pgwrapper::GetString(res.get(), 0, 0)); VLOG(3) << "Got back " << returned_spec << " of length " << returned_spec.length(); - // TODO(jason): handle partially finished backfills. - *backfilled_until = ""; + + PgsqlBackfillSpecPB spec; + spec.ParseFromString(a2b_hex(returned_spec)); + return spec; +} + +struct BackfillParams { + explicit BackfillParams(const CoarseTimePoint& deadline) { + batch_size = GetAtomicFlag(&FLAGS_backfill_index_write_batch_size); + rate_per_sec = GetAtomicFlag(&FLAGS_backfill_index_rate_rows_per_sec); + auto grace_margin_ms = GetAtomicFlag(&FLAGS_backfill_index_timeout_grace_margin_ms); + if (grace_margin_ms < 0) { + // We need: grace_margin_ms >= 1000 * batch_size / rate_per_sec; + // By default, we will set it to twice the minimum value + 1s. + grace_margin_ms = (rate_per_sec > 0 ? 1000 * (1 + 2.0 * batch_size / rate_per_sec) : 1000); + YB_LOG_EVERY_N(INFO, 100000) << "Using grace margin of " << grace_margin_ms << "ms"; + } + modified_deadline = deadline - grace_margin_ms * 1ms; + start_time = CoarseMonoClock::Now(); + } + + CoarseTimePoint start_time; + CoarseTimePoint modified_deadline; + size_t rate_per_sec; + size_t batch_size; +}; + +// Slow down before the next batch to throttle the rate of processing. +void MaybeSleepToThrottleBackfill( + const CoarseTimePoint& start_time, + size_t number_of_rows_processed) { + if (FLAGS_backfill_index_rate_rows_per_sec <= 0) { + return; + } + + auto now = CoarseMonoClock::Now(); + auto duration_for_rows_processed = MonoDelta(now - start_time); + auto expected_time_for_processing_rows = MonoDelta::FromMilliseconds( + number_of_rows_processed * 1000 / FLAGS_backfill_index_rate_rows_per_sec); + DVLOG(3) << "Duration since last batch " << duration_for_rows_processed << " expected duration " + << expected_time_for_processing_rows << " extra time to sleep: " + << expected_time_for_processing_rows - duration_for_rows_processed; + if (duration_for_rows_processed < expected_time_for_processing_rows) { + SleepFor(expected_time_for_processing_rows - duration_for_rows_processed); + } +} + +bool CanProceedToBackfillMoreRows( + const BackfillParams& backfill_params, + size_t number_of_rows_processed) { + auto now = CoarseMonoClock::Now(); + if (now > backfill_params.modified_deadline || + (FLAGS_TEST_backfill_paging_size > 0 && + number_of_rows_processed >= FLAGS_TEST_backfill_paging_size)) { + // We are done if we are out of time. + // Or, if for testing purposes we have a bound on the size of batches to process. + return false; + } + return true; +} + +bool CanProceedToBackfillMoreRows( + const BackfillParams& backfill_params, + const string& backfilled_until, + size_t number_of_rows_processed) { + if (backfilled_until.empty()) { + // The backfill is done for this tablet. No need to do another batch. + return false; + } + + return CanProceedToBackfillMoreRows(backfill_params, number_of_rows_processed); +} + +} // namespace + +// Assume that we are already in the Backfilling mode. +Status Tablet::BackfillIndexesForYsql( + const std::vector& indexes, + const std::string& backfill_from, + const CoarseTimePoint deadline, + const HybridTime read_time, + const HostPort& pgsql_proxy_bind_address, + const std::string& database_name, + const uint64_t postgres_auth_key, + size_t* number_of_rows_processed, + std::string* backfilled_until) { + if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) { + TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms); + SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms)); + } + LOG(INFO) << "Begin " << __func__ << " at " << read_time << " from " + << (backfill_from.empty() ? "" : strings::b2a_hex(backfill_from)) + << " for " << AsString(indexes); + *backfilled_until = backfill_from; + pgwrapper::PGConnPtr conn = + VERIFY_RESULT(ConnectToPostgres(pgsql_proxy_bind_address, database_name, postgres_auth_key)); + + // Construct query string. + std::string index_oids; + { + std::stringstream ss; + for (auto& index : indexes) { + Oid index_oid = VERIFY_RESULT(GetPgsqlTableOid(index.table_id())); + ss << index_oid << ","; + } + index_oids = ss.str(); + index_oids.pop_back(); + } + std::string partition_key = metadata_->partition()->partition_key_start(); + + BackfillParams backfill_params(deadline); + *number_of_rows_processed = 0; + do { + std::string serialized_backfill_spec = + GenerateSerializedBackfillSpec(backfill_params.batch_size, *backfilled_until); + + // This should be safe from injection attacks because the parameters only consist of characters + // [,0-9a-f]. + std::string query_str = Format( + "BACKFILL INDEX $0 WITH x'$1' READ TIME $2 PARTITION x'$3';", + index_oids, + b2a_hex(serialized_backfill_spec), + read_time.ToUint64(), + b2a_hex(partition_key)); + VLOG(1) << __func__ << ": libpq query string: " << query_str; + + PgsqlBackfillSpecPB spec = VERIFY_RESULT(QueryPostgresToDoBackfill(conn, query_str)); + *number_of_rows_processed += spec.count(); + *backfilled_until = spec.next_row_key(); + + VLOG(2) << "Backfilled " << *number_of_rows_processed << " rows. " + << "Setting backfilled_until to " << b2a_hex(*backfilled_until) << " of length " + << backfilled_until->length(); + + MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed); + } while (CanProceedToBackfillMoreRows( + backfill_params, *backfilled_until, *number_of_rows_processed)); + + VLOG(1) << "Backfilled " << *number_of_rows_processed << " rows. " + << "Set backfilled_until to " + << (backfilled_until->empty() ? "(empty)" : b2a_hex(*backfilled_until)); return Status::OK(); } @@ -2514,7 +2665,7 @@ Status Tablet::BackfillIndexes( const std::string& backfill_from, const CoarseTimePoint deadline, const HybridTime read_time, - int* number_of_rows_processed, + size_t* number_of_rows_processed, std::string* backfilled_until, std::unordered_set* failed_indexes) { if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) { @@ -2531,18 +2682,9 @@ Status Tablet::BackfillIndexes( VERIFY_RESULT(NewRowIterator(projection, ReadHybridTime::SingleTime(read_time))); QLTableRow row; std::vector> index_requests; - auto grace_margin_ms = GetAtomicFlag(&FLAGS_backfill_index_timeout_grace_margin_ms); - if (grace_margin_ms < 0) { - const auto rate_per_sec = GetAtomicFlag(&FLAGS_backfill_index_rate_rows_per_sec); - const auto batch_size = GetAtomicFlag(&FLAGS_backfill_index_write_batch_size); - // We need: grace_margin_ms >= 1000 * batch_size / rate_per_sec; - // By default, we will set it to twice the minimum value + 1s. - grace_margin_ms = (rate_per_sec > 0 ? 1000 * (1 + 2.0 * batch_size / rate_per_sec) : 1000); - YB_LOG_EVERY_N(INFO, 100000) << "Using grace margin of " << grace_margin_ms << "ms"; - } - const yb::CoarseDuration kMargin = grace_margin_ms * 1ms; + + BackfillParams backfill_params{deadline}; constexpr auto kProgressInterval = 1000; - CoarseTimePoint last_flushed_at; if (!backfill_from.empty()) { VLOG(1) << "Resuming backfill from " << b2a_hex(backfill_from); @@ -2560,9 +2702,7 @@ Status Tablet::BackfillIndexes( *backfilled_until = VERIFY_RESULT(iter->GetTupleId()).ToBuffer(); } - if (CoarseMonoClock::Now() + kMargin > deadline || - (FLAGS_TEST_backfill_paging_size > 0 && - *number_of_rows_processed == FLAGS_TEST_backfill_paging_size)) { + if (!CanProceedToBackfillMoreRows(backfill_params, *number_of_rows_processed)) { resume_backfill_from = VERIFY_RESULT(iter->GetTupleId()).ToBuffer(); break; } @@ -2585,10 +2725,11 @@ Status Tablet::BackfillIndexes( } DVLOG(2) << "Building index for fetched row: " << row.ToString(); - RETURN_NOT_OK(UpdateIndexInBatches( - row, indexes, read_time, &index_requests, &last_flushed_at, failed_indexes)); + RETURN_NOT_OK(UpdateIndexInBatches(row, indexes, read_time, &index_requests, failed_indexes)); + if (++(*number_of_rows_processed) % kProgressInterval == 0) { VLOG(1) << "Processed " << *number_of_rows_processed << " rows"; + MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed); } } @@ -2602,9 +2743,9 @@ Status Tablet::BackfillIndexes( << " rows were dropped in index backfill."; } - RETURN_NOT_OK(FlushWriteIndexBatchIfRequired( - /* forced */ true, read_time, &index_requests, &last_flushed_at, failed_indexes)); - + VLOG(1) << "Processed " << *number_of_rows_processed << " rows"; + RETURN_NOT_OK(FlushWriteIndexBatch(read_time, &index_requests, failed_indexes)); + MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed); *backfilled_until = resume_backfill_from; LOG(INFO) << "Done BackfillIndexes at " << read_time << " for " << AsString(index_ids) << " until " @@ -2617,7 +2758,6 @@ Status Tablet::UpdateIndexInBatches( const std::vector& indexes, const HybridTime write_time, std::vector>* index_requests, - CoarseTimePoint* last_flushed_at, std::unordered_set* failed_indexes) { const QLTableRow& kEmptyRow = QLTableRow::empty_row(); QLExprExecutor expr_executor; @@ -2633,7 +2773,7 @@ Status Tablet::UpdateIndexInBatches( // Update the index write op. return FlushWriteIndexBatchIfRequired( - false, write_time, index_requests, last_flushed_at, failed_indexes); + write_time, index_requests, failed_indexes); } Result> Tablet::GetSessionForVerifyOrBackfill() { @@ -2648,16 +2788,22 @@ Result> Tablet::GetSessionForVerifyOrBackfill() { } Status Tablet::FlushWriteIndexBatchIfRequired( - bool force_flush, const HybridTime write_time, std::vector>* index_requests, - CoarseTimePoint* last_flushed_at, std::unordered_set* failed_indexes) { - if (!force_flush && index_requests->size() < FLAGS_backfill_index_write_batch_size) { + if (index_requests->size() < FLAGS_backfill_index_write_batch_size) { return Status::OK(); } + return FlushWriteIndexBatch(write_time, index_requests, failed_indexes); +} - if (!YBMetaDataCache()) { +Status Tablet::FlushWriteIndexBatch( + const HybridTime write_time, + std::vector>* index_requests, + std::unordered_set* failed_indexes) { + if (!client_future_.valid()) { + return STATUS_FORMAT(IllegalState, "Client future is not set up for $0", tablet_id()); + } else if (!YBMetaDataCache()) { return STATUS(IllegalState, "Table metadata cache is not present for index update"); } std::shared_ptr session = VERIFY_RESULT(GetSessionForVerifyOrBackfill()); @@ -2697,9 +2843,6 @@ Status Tablet::FlushWriteIndexBatchIfRequired( (!ops_by_primary_key.empty() ? ops_by_primary_key.size() : write_ops.size())); RETURN_NOT_OK(FlushWithRetries(session, write_ops, kMaxNumRetries, failed_indexes)); - - SleepToThrottleRate(index_requests, FLAGS_backfill_index_rate_rows_per_sec, last_flushed_at); - *last_flushed_at = CoarseMonoClock::Now(); index_requests->clear(); return Status::OK(); diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 99c9123cff44..8f811da51f1e 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -180,12 +180,10 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { // Performs backfill for the key range beginning from the row immediately after // , until either it reaches the end of the tablet // or the current time is past deadline. + // * will be set to the number of rows backfilled. // will be set to the first row that was not backfilled, so that the // next API call can resume from where the backfill was left off. // Note that only applies to the non-failing indexes. - // - // TODO(#5326): For now YSQL does not support chunking, so backfill will always run to the - // end of the tablet and set backfilled_until as the empty string. CHECKED_STATUS BackfillIndexesForYsql( const std::vector& indexes, const std::string& backfill_from, @@ -194,6 +192,7 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { const HostPort& pgsql_proxy_bind_address, const std::string& database_name, const uint64_t postgres_auth_key, + size_t* number_of_rows_processed, std::string* backfilled_until); CHECKED_STATUS VerifyIndexTableConsistencyForCQL( @@ -246,17 +245,18 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { // Performs backfill for the key range beginning from the row , // until either it reaches the end of the tablet // or the current time is past deadline. - // will be updated with the collection of index-ids for which any errors - // were encountered. + // * will be set to the number of rows backfilled. // will be set to the first row that was not backfilled, so that the // next API call can resume from where the backfill was left off. // Note that only applies to the non-failing indexes. + // will be updated with the collection of index-ids for which any errors + // were encountered. CHECKED_STATUS BackfillIndexes( const std::vector& indexes, const std::string& backfill_from, const CoarseTimePoint deadline, const HybridTime read_time, - int* number_of_rows_processed, + size_t* number_of_rows_processed, std::string* backfilled_until, std::unordered_set* failed_indexes); @@ -265,16 +265,17 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { const std::vector& indexes, const HybridTime write_time, std::vector>* index_requests, - CoarseTimePoint* last_flushed_at, std::unordered_set* failed_indexes); Result> GetSessionForVerifyOrBackfill(); CHECKED_STATUS FlushWriteIndexBatchIfRequired( - bool force_flush, const HybridTime write_time, std::vector>* index_requests, - CoarseTimePoint* last_flushed_at, + std::unordered_set* failed_indexes); + CHECKED_STATUS FlushWriteIndexBatch( + const HybridTime write_time, + std::vector>* index_requests, std::unordered_set* failed_indexes); template diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index 89f290d65a38..2afefcc13ccc 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -776,7 +776,7 @@ void TabletServiceAdminImpl::BackfillIndex( Status backfill_status; std::string backfilled_until; std::unordered_set failed_indexes; - int number_rows_processed = 0; + size_t number_rows_processed = 0; if (is_pg_table) { if (!req->has_namespace_name()) { SetupErrorAndRespond( @@ -796,6 +796,7 @@ void TabletServiceAdminImpl::BackfillIndex( server_->pgsql_proxy_bind_address(), req->namespace_name(), server_->GetSharedMemoryPostgresAuthKey(), + &number_rows_processed, &backfilled_until); if (backfill_status.IsIllegalState()) { DCHECK_EQ(failed_indexes.size(), 0) << "We don't support batching in YSQL yet"; diff --git a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc index c1f969ee2931..67b3a71843a3 100644 --- a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc +++ b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc @@ -11,6 +11,7 @@ // under the License. #include +#include #include #include #include @@ -53,12 +54,18 @@ class PgIndexBackfillTest : public LibPqTestBase { void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { options->extra_master_flags.push_back("--ysql_disable_index_backfill=false"); + options->extra_master_flags.push_back( + Format("--ysql_num_shards_per_tserver=$0", kTabletsPerServer)); options->extra_tserver_flags.push_back("--ysql_disable_index_backfill=false"); + options->extra_tserver_flags.push_back( + Format("--ysql_num_shards_per_tserver=$0", kTabletsPerServer)); } protected: void TestSimpleBackfill(const std::string& table_create_suffix = ""); + void TestLargeBackfill(const int num_rows); void TestRetainDeleteMarkers(const std::string& db_name); + const int kTabletsPerServer = 8; std::unique_ptr conn_; }; @@ -78,6 +85,27 @@ Result GetTableIdByTableName( return STATUS(NotFound, "The table does not exist"); } +Result TotalBackfillRpcMetric(ExternalMiniCluster* cluster, const char* type) { + int total_rpc_calls = 0; + constexpr auto metric_name = "handler_latency_yb_tserver_TabletServerAdminService_BackfillIndex"; + for (auto ts : cluster->tserver_daemons()) { + auto val = VERIFY_RESULT(ts->GetInt64Metric("server", "yb.tabletserver", metric_name, type)); + total_rpc_calls += val; + VLOG(1) << ts->bind_host() << " for " << type << " returned " << val; + } + return total_rpc_calls; +} + +Result TotalBackfillRpcCalls(ExternalMiniCluster* cluster) { + return TotalBackfillRpcMetric(cluster, "total_count"); +} + +Result AvgBackfillRpcLatencyInMicros(ExternalMiniCluster* cluster) { + auto num_calls = VERIFY_RESULT(TotalBackfillRpcMetric(cluster, "total_count")); + double total_latency = VERIFY_RESULT(TotalBackfillRpcMetric(cluster, "total_sum")); + return total_latency / num_calls; +} + } // namespace void PgIndexBackfillTest::TestSimpleBackfill(const std::string& table_create_suffix) { @@ -128,6 +156,30 @@ void PgIndexBackfillTest::TestRetainDeleteMarkers(const std::string& db_name) { ASSERT_FALSE(table_info->schema.table_properties().retain_delete_markers()); } +void PgIndexBackfillTest::TestLargeBackfill(const int num_rows) { + ASSERT_OK(conn_->ExecuteFormat("CREATE TABLE $0 (i int)", kTableName)); + + // Insert bunch of rows. + ASSERT_OK(conn_->ExecuteFormat( + "INSERT INTO $0 VALUES (generate_series(1, $1))", + kTableName, + num_rows)); + + // Create index. + ASSERT_OK(conn_->ExecuteFormat("CREATE INDEX ON $0 (i ASC)", kTableName)); + + // All rows should be in the index. + const std::string query = Format( + "SELECT COUNT(*) FROM $0 WHERE i > 0", + kTableName); + ASSERT_TRUE(ASSERT_RESULT(conn_->HasIndexScan(query))); + PGResultPtr res = ASSERT_RESULT(conn_->Fetch(query)); + ASSERT_EQ(PQntuples(res.get()), 1); + ASSERT_EQ(PQnfields(res.get()), 1); + int actual_num_rows = ASSERT_RESULT(GetInt64(res.get(), 0, 0)); + ASSERT_EQ(actual_num_rows, num_rows); +} + // Make sure that backfill works. TEST_F(PgIndexBackfillTest, YB_DISABLE_TEST_IN_TSAN(Simple)) { TestSimpleBackfill(); @@ -321,28 +373,120 @@ TEST_F(PgIndexBackfillTest, YB_DISABLE_TEST_IN_TSAN(NonexistentDelete)) { // Make sure that index backfill on large tables backfills all data. TEST_F(PgIndexBackfillTest, YB_DISABLE_TEST_IN_TSAN(Large)) { constexpr int kNumRows = 10000; + TestLargeBackfill(kNumRows); + int expected_calls = cluster_->num_tablet_servers() * kTabletsPerServer; + auto actual_calls = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + ASSERT_EQ(actual_calls, expected_calls); +} - ASSERT_OK(conn_->ExecuteFormat("CREATE TABLE $0 (i int)", kTableName)); +class PgIndexBackfillTestChunking : public PgIndexBackfillTest { + protected: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + PgIndexBackfillTest::UpdateMiniClusterOptions(options); + options->extra_tserver_flags.push_back( + Format("--TEST_backfill_paging_size=$0", kBatchSize)); + options->extra_tserver_flags.push_back( + Format("--backfill_index_write_batch_size=$0", kBatchSize)); + options->extra_tserver_flags.push_back( + Format("--ysql_prefetch_limit=$0", kPrefetchSize)); + } + const int kBatchSize = 200; + const int kPrefetchSize = 128; +}; - // Insert bunch of rows. - ASSERT_OK(conn_->ExecuteFormat( - "INSERT INTO $0 VALUES (generate_series(1, $1))", - kTableName, - kNumRows)); +// Set batch size and prefetch limit such that: +// Each tablet requires multiple RPC calls from the master to complete backfill. +// Also, set the ysql_prefetch_size small to ensure that each of these +// `BACKFILL INDEX` calls will fetch data from the tserver at least 2 times. +// Fetch metrics to ensure that there have been > num_tablets rpc's. +TEST_F_EX( + PgIndexBackfillTest, YB_DISABLE_TEST_IN_TSAN(BackfillInChunks), PgIndexBackfillTestChunking) { + constexpr int kNumRows = 10000; + TestLargeBackfill(kNumRows); + + const size_t effective_batch_size = + static_cast(kPrefetchSize * ceil(1.0 * kBatchSize / kPrefetchSize)); + const size_t min_expected_calls = + static_cast(ceil(1.0 * kNumRows / effective_batch_size)); + auto actual_calls = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + LOG(INFO) << "Had " << actual_calls << " backfill rpc calls. " + << "Expected at least " << kNumRows << "/" << effective_batch_size << " = " + << min_expected_calls; + ASSERT_GE(actual_calls, min_expected_calls); +} - // Create index. - ASSERT_OK(conn_->ExecuteFormat("CREATE INDEX ON $0 (i ASC)", kTableName)); +class PgIndexBackfillTestThrottled : public PgIndexBackfillTest { + protected: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + PgIndexBackfillTest::UpdateMiniClusterOptions(options); + options->extra_tserver_flags.push_back("--ysql_prefetch_limit=128"); + options->extra_tserver_flags.push_back("--backfill_index_write_batch_size=256"); + options->extra_tserver_flags.push_back( + Format("--backfill_index_rate_rows_per_sec=$0", kBackfillRateRowsPerSec)); + options->extra_tserver_flags.push_back( + Format("--num_concurrent_backfills_allowed=$0", kNumConcurrentBackfills)); + } - // All rows should be in the index. - const std::string query = Format( - "SELECT COUNT(*) FROM $0 WHERE i > 0", - kTableName); - ASSERT_TRUE(ASSERT_RESULT(conn_->HasIndexScan(query))); - PGResultPtr res = ASSERT_RESULT(conn_->Fetch(query)); - ASSERT_EQ(PQntuples(res.get()), 1); - ASSERT_EQ(PQnfields(res.get()), 1); - int actual_num_rows = ASSERT_RESULT(GetInt64(res.get(), 0, 0)); - ASSERT_EQ(actual_num_rows, kNumRows); + const int kBackfillRateRowsPerSec = 100; + const int kNumConcurrentBackfills = 1; + const int kBackfillRpcDeadlineMs = 10000; +}; + +// Set the backfill batch size and backfill rate +// Check that the time taken to backfill is no less than what is expected. +TEST_F_EX( + PgIndexBackfillTest, YB_DISABLE_TEST_IN_TSAN(ThrottledBackfill), PgIndexBackfillTestThrottled) { + constexpr int kNumRows = 10000; + auto start_time = CoarseMonoClock::Now(); + TestLargeBackfill(kNumRows); + auto end_time = CoarseMonoClock::Now(); + auto expected_time = MonoDelta::FromSeconds( + kNumRows * 1.0 / + (cluster_->num_tablet_servers() * kNumConcurrentBackfills * kBackfillRateRowsPerSec)); + ASSERT_GE(MonoDelta{end_time - start_time}, expected_time); + + // Expect only 1 call per tablet + const size_t expected_calls = cluster_->num_tablet_servers() * kTabletsPerServer; + auto actual_calls = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + ASSERT_EQ(actual_calls, expected_calls); + + auto avg_rpc_latency_usec = ASSERT_RESULT(AvgBackfillRpcLatencyInMicros(cluster_.get())); + LOG(INFO) << "Avg backfill latency was " << avg_rpc_latency_usec << " us"; + // --ysql_index_backfill_rpc_timeout was not set for this test. + // Check to ensure that we have picked a reasonable value for kBackfillRpcDeadlineMs + // such that BackfillRespectsDeadline will only succeed if throttling is implemented. + ASSERT_GT(avg_rpc_latency_usec, kBackfillRpcDeadlineMs * 1000); +} + +class PgIndexBackfillTestDeadlines : public PgIndexBackfillTestThrottled { + protected: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + PgIndexBackfillTestThrottled::UpdateMiniClusterOptions(options); + options->extra_master_flags.push_back( + Format("--ysql_index_backfill_rpc_timeout_ms=$0", kBackfillRpcDeadlineMs)); + options->extra_master_flags.push_back( + Format("--backfill_index_timeout_grace_margin_ms=$0", kBackfillRpcDeadlineMs / 2)); + } +}; + +// Set the backfill batch size, backfill rate and a low timeout for backfill rpc. +// Ensure that the backfill is completed. And that the avg rpc latency is +// below what is set as the timeout. +TEST_F_EX( + PgIndexBackfillTest, + YB_DISABLE_TEST_IN_TSAN(BackfillRespectsDeadline), + PgIndexBackfillTestDeadlines) { + constexpr int kNumRows = 10000; + TestLargeBackfill(kNumRows); + + const size_t min_expected_calls = static_cast( + ceil(kNumRows / (kBackfillRpcDeadlineMs * kBackfillRateRowsPerSec * 0.001))); + auto actual_calls = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + ASSERT_GE(actual_calls, min_expected_calls); + + auto avg_rpc_latency_usec = ASSERT_RESULT(AvgBackfillRpcLatencyInMicros(cluster_.get())); + LOG(INFO) << "Avg backfill latency was " << avg_rpc_latency_usec << " us"; + ASSERT_LE(avg_rpc_latency_usec, kBackfillRpcDeadlineMs * 1000); } // Make sure that CREATE INDEX NONCONCURRENTLY doesn't use backfill.