diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 6729bc621cd5..2c3317d7166c 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -498,20 +498,27 @@ Status YBClient::IsAlterTableInProgress(const YBTableName& table_name, return data_->IsAlterTableInProgress(this, table_name, table_id, deadline, alter_in_progress); } -Status YBClient::GetTableSchema(const YBTableName& table_name, - YBSchema* schema, - PartitionSchema* partition_schema) { - auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout(); +Result YBClient::GetYBTableInfo(const YBTableName& table_name) { YBTableInfo info; + auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout(); RETURN_NOT_OK(data_->GetTableSchema(this, table_name, deadline, &info)); + return info; +} +Status YBClient::GetTableSchema(const YBTableName& table_name, + YBSchema* schema, + PartitionSchema* partition_schema) { + Result info = GetYBTableInfo(table_name); + if (!info.ok()) { + return info.status(); + } // Verify it is not an index table. - if (info.index_info) { + if (info->index_info) { return STATUS(NotFound, "The table does not exist"); } - *schema = std::move(info.schema); - *partition_schema = std::move(info.partition_schema); + *schema = std::move(info->schema); + *partition_schema = std::move(info->partition_schema); return Status::OK(); } diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 2dab9e325031..6582bc4ad197 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -294,6 +294,7 @@ class YBClient { CHECKED_STATUS GetTableSchema(const YBTableName& table_name, YBSchema* schema, PartitionSchema* partition_schema); + Result GetYBTableInfo(const YBTableName& table_name); CHECKED_STATUS GetTableSchemaById(const TableId& table_id, std::shared_ptr info, StatusCallback callback); @@ -625,6 +626,7 @@ class YBClient { FRIEND_TEST(ClientTest, TestScanTimeout); FRIEND_TEST(ClientTest, TestWriteWithDeadMaster); FRIEND_TEST(MasterFailoverTest, DISABLED_TestPauseAfterCreateTableIssued); + FRIEND_TEST(MasterFailoverTestIndexCreation, TestPauseAfterCreateIndexIssued); friend std::future> LookupFirstTabletFuture( const YBTable* table); diff --git a/src/yb/client/table.cc b/src/yb/client/table.cc index ae2e19721dfe..374838fedc7b 100644 --- a/src/yb/client/table.cc +++ b/src/yb/client/table.cc @@ -114,9 +114,22 @@ bool YBTable::IsIndex() const { return info_.index_info != boost::none; } +bool YBTable::IsUniqueIndex() const { + return info_.index_info.is_initialized() && info_.index_info->is_unique(); +} + const IndexInfo& YBTable::index_info() const { - CHECK(info_.index_info); - return *info_.index_info; + static IndexInfo kEmptyIndexInfo; + if (info_.index_info) { + return *info_.index_info; + } + return kEmptyIndexInfo; +} + +std::string YBTable::ToString() const { + return strings::Substitute( + "$0 $1 IndexInfo: $2 IndexMap $3", (IsIndex() ? "Index Table" : "Normal Table"), id(), + yb::ToString(index_info()), yb::ToString(index_map())); } const PartitionSchema& YBTable::partition_schema() const { diff --git a/src/yb/client/table.h b/src/yb/client/table.h index 9605363d1adc..5a948afc3df3 100644 --- a/src/yb/client/table.h +++ b/src/yb/client/table.h @@ -86,9 +86,12 @@ class YBTable : public std::enable_shared_from_this { // Is this an index? bool IsIndex() const; + bool IsUniqueIndex() const; + // For index table: information about this index. const IndexInfo& index_info() const; + std::string ToString() const; //------------------------------------------------------------------------------------------------ // CQL support // Create a new QL operation for this table. diff --git a/src/yb/client/table_creator.cc b/src/yb/client/table_creator.cc index 4472f23619c8..3b397bb762cf 100644 --- a/src/yb/client/table_creator.cc +++ b/src/yb/client/table_creator.cc @@ -160,6 +160,11 @@ YBTableCreator& YBTableCreator::wait(bool wait) { return *this; } +YBTableCreator& YBTableCreator::TEST_use_old_style_create_request() { + TEST_use_old_style_create_request_ = true; + return *this; +} + Status YBTableCreator::Create() { const char *object_type = index_info_.has_indexed_table_id() ? "index" : "table"; if (table_name_.table_name().empty()) { @@ -234,7 +239,9 @@ Status YBTableCreator::Create() { // Index mapping with data-table being indexed. if (index_info_.has_indexed_table_id()) { - req.mutable_index_info()->CopyFrom(index_info_); + if (!TEST_use_old_style_create_request_) { + req.mutable_index_info()->CopyFrom(index_info_); + } // For compatibility reasons, set the old fields just in case we have new clients talking to // old master server during rolling upgrade. diff --git a/src/yb/client/table_creator.h b/src/yb/client/table_creator.h index 508818b56b52..ad2b9f2746b5 100644 --- a/src/yb/client/table_creator.h +++ b/src/yb/client/table_creator.h @@ -87,6 +87,9 @@ class YBTableCreator { // For index table: sets the indexed table id of this index. YBTableCreator& indexed_table_id(const std::string& id); + // For index table: uses the old style request without index_info. + YBTableCreator& TEST_use_old_style_create_request(); + // For index table: sets whether this is a local index. YBTableCreator& is_local_index(bool is_local_index); @@ -161,6 +164,8 @@ class YBTableCreator { // the data-table being indexed. IndexInfoPB index_info_; + bool TEST_use_old_style_create_request_ = false; + MonoDelta timeout_; bool wait_ = true; diff --git a/src/yb/common/common.proto b/src/yb/common/common.proto index 12fa748cdfa8..754740e52e6b 100644 --- a/src/yb/common/common.proto +++ b/src/yb/common/common.proto @@ -147,6 +147,14 @@ enum PermissionType { ALL_PERMISSION = 999999999; } +enum IndexPermissions { + INDEX_PERM_DELETE_ONLY = 0; + INDEX_PERM_WRITE_AND_DELETE = 2; + INDEX_PERM_DO_BACKFILL = 4; + INDEX_PERM_READ_WRITE_AND_DELETE = 6; + INDEX_PERM_BACKFILL_FAILED = 8; +} + // The type used in column schemas, which may have type parameters (i.e. for collections) // If types have parameters, they are stored in the params field. Otherwise params is empty. // e.g. (using lower case of QLTypes and upper case for DataType): @@ -225,6 +233,7 @@ message TablePropertiesPB { optional bool use_mangled_column_name = 6 [ default = false ]; optional int32 num_tablets = 7 [ default = 0 ]; optional bool is_ysql_catalog_table = 8 [ default = false ]; + optional bool is_backfilling = 9 [ default = false ]; } message SchemaPB { @@ -248,6 +257,9 @@ message IndexInfoPB { optional uint32 version = 2 [ default = 0]; // Index table's schema version. optional bool is_local = 3 [ default = false ]; // Whether the index is a local index optional bool is_unique = 7 [ default = false ]; // Whether the index is a unique index + // We should only have this in the elements of "repeated IndexInfoPB indexes" of the + // SysTableEntryPB of the main table. + optional IndexPermissions index_permissions = 12 [ default = INDEX_PERM_READ_WRITE_AND_DELETE ]; // Index column mapping. // "colexpr" is used to compute the value of this column in an INDEX. diff --git a/src/yb/common/index.cc b/src/yb/common/index.cc index ada3e766ad18..633e542cbde4 100644 --- a/src/yb/common/index.cc +++ b/src/yb/common/index.cc @@ -75,6 +75,7 @@ IndexInfo::IndexInfo(const IndexInfoPB& pb) range_column_count_(pb.range_column_count()), indexed_hash_column_ids_(ColumnIdsFromPB(pb.indexed_hash_column_ids())), indexed_range_column_ids_(ColumnIdsFromPB(pb.indexed_range_column_ids())), + index_permissions_(pb.index_permissions()), use_mangled_column_name_(pb.use_mangled_column_name()) { for (const IndexInfo::IndexColumn &index_col : columns_) { covered_column_ids_.insert(index_col.indexed_column_id); @@ -99,6 +100,7 @@ void IndexInfo::ToPB(IndexInfoPB* pb) const { pb->add_indexed_range_column_ids(id); } pb->set_use_mangled_column_name(use_mangled_column_name_); + pb->set_index_permissions(index_permissions_); } vector IndexInfo::index_key_column_ids() const { diff --git a/src/yb/common/index.h b/src/yb/common/index.h index 816f25ebaa1e..42cdedc0e8b2 100644 --- a/src/yb/common/index.h +++ b/src/yb/common/index.h @@ -32,7 +32,7 @@ class IndexInfo { // Index column mapping. struct IndexColumn { ColumnId column_id; // Column id in the index table. - string column_name; // Column name in the index table - colexpr.MangledName(). + std::string column_name; // Column name in the index table - colexpr.MangledName(). ColumnId indexed_column_id; // Corresponding column id in indexed table. QLExpressionPB colexpr; // Index expression. @@ -78,10 +78,35 @@ class IndexInfo { // Check if this INDEX contain the column being referenced by the given selected expression. // - If found, return the location of the column (columns_[loc]). // - Otherwise, return -1. - int32_t IsExprCovered(const string& expr_content) const; + int32_t IsExprCovered(const std::string& expr_content) const; + + // Are read operations allowed to use the index? Reads are not allowed until + // the index backfill is successfully completed. + bool AllowReads() const { return index_permissions_ == INDEX_PERM_READ_WRITE_AND_DELETE; } + + // Should write operations to the index be allowed to update it. + bool AllowWrites() const { + return index_permissions_ >= INDEX_PERM_WRITE_AND_DELETE && + index_permissions_ <= INDEX_PERM_READ_WRITE_AND_DELETE; + } + + // Should delete operations to the index be allowed to update it. + bool AllowDelete() const { + return index_permissions_ >= INDEX_PERM_DELETE_ONLY && + index_permissions_ <= INDEX_PERM_READ_WRITE_AND_DELETE; + } + + // Is the index being backfilled. + bool Backfilling() const { return index_permissions_ == INDEX_PERM_DO_BACKFILL; } + + std::string ToString() const { + IndexInfoPB pb; + ToPB(&pb); + return yb::ToString(pb); + } // Same as "IsExprCovered" but only search the key columns. - int32_t FindKeyIndex(const string& key_name) const; + int32_t FindKeyIndex(const std::string& key_name) const; bool use_mangled_column_name() const { return use_mangled_column_name_; @@ -98,6 +123,7 @@ class IndexInfo { const size_t range_column_count_ = 0; // Number of range columns in the index. const std::vector indexed_hash_column_ids_; // Hash column ids in the indexed table. const std::vector indexed_range_column_ids_; // Range column ids in the indexed table. + const IndexPermissions index_permissions_ = INDEX_PERM_READ_WRITE_AND_DELETE; // Column ids covered by the index (include indexed columns). std::unordered_set covered_column_ids_; diff --git a/src/yb/common/pgsql_resultset.h b/src/yb/common/pgsql_resultset.h index b445c643a5e5..68e89ade5f0e 100644 --- a/src/yb/common/pgsql_resultset.h +++ b/src/yb/common/pgsql_resultset.h @@ -44,7 +44,7 @@ class PgsqlRSRowDesc { return ql_type_->main(); } private: - const string name_; + const std::string name_; QLType::SharedPtr ql_type_; }; diff --git a/src/yb/common/ql_type.h b/src/yb/common/ql_type.h index 7d26509fe382..e471e68a574f 100644 --- a/src/yb/common/ql_type.h +++ b/src/yb/common/ql_type.h @@ -37,15 +37,15 @@ class UDTypeInfo { : keyspace_name_(keyspace_name), name_(name) { } - const string& keyspace_name() const { + const std::string& keyspace_name() const { return keyspace_name_; } - const string& name() const { + const std::string& name() const { return name_; } - const string& id() const { + const std::string& id() const { return id_; } @@ -53,7 +53,7 @@ class UDTypeInfo { return field_names_; } - const string& field_name(int index) const { + const std::string& field_name(int index) const { return field_names_[index]; } @@ -150,7 +150,7 @@ class QLType { } // Constructor for user-defined types - QLType(const string& keyspace_name, const string& type_name) + QLType(const std::string& keyspace_name, const std::string& type_name) : id_(USER_DEFINED_TYPE), params_(0) { udtype_info_ = std::make_shared(keyspace_name, type_name); } @@ -231,19 +231,19 @@ class QLType { return udtype_info_->field_names(); } - const string& udtype_field_name(int index) const { + const std::string& udtype_field_name(int index) const { return udtype_info_->field_name(index); } - const string& udtype_keyspace_name() const { + const std::string& udtype_keyspace_name() const { return udtype_info_->keyspace_name(); } - const string& udtype_name() const { + const std::string& udtype_name() const { return udtype_info_->name(); } - const string& udtype_id() const { + const std::string& udtype_id() const { return udtype_info_->id(); } @@ -255,7 +255,7 @@ class QLType { } // returns position of "field_name" in udtype_field_names() vector if found, otherwise -1 - const int GetUDTypeFieldIdxByName(const string &field_name) const { + const int GetUDTypeFieldIdxByName(const std::string &field_name) const { const std::vector& field_names = udtype_field_names(); int i = 0; while (i != field_names.size()) { @@ -410,9 +410,9 @@ class QLType { //------------------------------------------------------------------------------------------------ // Logging supports. - const string ToString() const; + const std::string ToString() const; void ToString(std::stringstream& os) const; - static const string ToCQLString(const DataType& datatype); + static const std::string ToCQLString(const DataType& datatype); //------------------------------------------------------------------------------------------------ // static methods diff --git a/src/yb/common/schema-test.cc b/src/yb/common/schema-test.cc index e2baa159d91a..ed18f42c0605 100644 --- a/src/yb/common/schema-test.cc +++ b/src/yb/common/schema-test.cc @@ -98,7 +98,8 @@ TEST(TestSchema, TestSchema) { "]\nproperties: contain_counters: false is_transactional: false " "consistency_level: STRONG " "use_mangled_column_name: false " - "is_ysql_catalog_table: false", + "is_ysql_catalog_table: false " + "is_backfilling: false", schema.ToString()); EXPECT_EQ("key[string NOT NULL NOT A PARTITION KEY]", schema.column(0).ToString()); EXPECT_EQ("uint32 NULLABLE NOT A PARTITION KEY", schema.column(1).TypeToString()); @@ -371,7 +372,8 @@ TEST(TestSchema, TestCreateProjection) { "]\nproperties: contain_counters: false is_transactional: false " "consistency_level: STRONG " "use_mangled_column_name: false " - "is_ysql_catalog_table: false", + "is_ysql_catalog_table: false " + "is_backfilling: false", partial_schema.ToString()); // By names, with IDS @@ -383,7 +385,8 @@ TEST(TestSchema, TestCreateProjection) { "]\nproperties: contain_counters: false is_transactional: false " "consistency_level: STRONG " "use_mangled_column_name: false " - "is_ysql_catalog_table: false", + "is_ysql_catalog_table: false " + "is_backfilling: false", schema_with_ids.column_id(0), schema_with_ids.column_id(1), schema_with_ids.column_id(3)), @@ -406,7 +409,8 @@ TEST(TestSchema, TestCreateProjection) { "]\nproperties: contain_counters: false is_transactional: false " "consistency_level: STRONG " "use_mangled_column_name: false " - "is_ysql_catalog_table: false", + "is_ysql_catalog_table: false " + "is_backfilling: false", schema_with_ids.column_id(0), schema_with_ids.column_id(1), schema_with_ids.column_id(3)), diff --git a/src/yb/common/schema.cc b/src/yb/common/schema.cc index b07cdb38c032..3ed6c7e08974 100644 --- a/src/yb/common/schema.cc +++ b/src/yb/common/schema.cc @@ -96,6 +96,7 @@ void TableProperties::ToTablePropertiesPB(TablePropertiesPB *pb) const { pb->set_num_tablets(num_tablets_); } pb->set_is_ysql_catalog_table(is_ysql_catalog_table_); + pb->set_is_backfilling(is_backfilling_); } TableProperties TableProperties::FromTablePropertiesPB(const TablePropertiesPB& pb) { @@ -124,6 +125,9 @@ TableProperties TableProperties::FromTablePropertiesPB(const TablePropertiesPB& if (pb.has_is_ysql_catalog_table()) { table_properties.set_is_ysql_catalog_table(pb.is_ysql_catalog_table()); } + if (pb.has_is_backfilling()) { + table_properties.SetIsBackfilling(pb.is_backfilling()); + } return table_properties; } @@ -149,6 +153,9 @@ void TableProperties::AlterFromTablePropertiesPB(const TablePropertiesPB& pb) { if (pb.has_is_ysql_catalog_table()) { set_is_ysql_catalog_table(pb.is_ysql_catalog_table()); } + if (pb.has_is_backfilling()) { + SetIsBackfilling(pb.is_backfilling()); + } } void TableProperties::Reset() { @@ -160,6 +167,7 @@ void TableProperties::Reset() { use_mangled_column_name_ = false; num_tablets_ = 0; is_ysql_catalog_table_ = false; + is_backfilling_ = false; } string TableProperties::ToString() const { diff --git a/src/yb/common/schema.h b/src/yb/common/schema.h index 2eded522cf1f..aff506f1518b 100644 --- a/src/yb/common/schema.h +++ b/src/yb/common/schema.h @@ -272,7 +272,7 @@ class ColumnSchema { sorting_type_ = sorting_type; } - const string sorting_type_string() const { + const std::string sorting_type_string() const { switch (sorting_type_) { case kNotSpecified: return "none"; @@ -288,17 +288,17 @@ class ColumnSchema { LOG (FATAL) << "Invalid sorting type: " << sorting_type_; } - const string &name() const { + const std::string &name() const { return name_; } // Return a string identifying this column, including its // name. - string ToString() const; + std::string ToString() const; // Same as above, but only including the type information. // For example, "STRING NOT NULL". - string TypeToString() const; + std::string TypeToString() const; bool EqualsType(const ColumnSchema &other) const { return is_nullable_ == other.is_nullable_ && @@ -317,8 +317,8 @@ class ColumnSchema { // Stringify the given cell. This just stringifies the cell contents, // and doesn't include the column name or type. - string Stringify(const void *cell) const { - string ret; + std::string Stringify(const void *cell) const { + std::string ret; type_info()->AppendDebugStringForValue(cell, &ret); return ret; } @@ -349,11 +349,11 @@ class ColumnSchema { private: friend class SchemaBuilder; - void set_name(const string& name) { + void set_name(const std::string& name) { name_ = name; } - string name_; + std::string name_; std::shared_ptr type_; bool is_nullable_; bool is_hash_key_; @@ -454,6 +454,10 @@ class TableProperties { return is_ysql_catalog_table_; } + bool IsBackfilling() const { return is_backfilling_; } + + void SetIsBackfilling(bool is_backfilling) { is_backfilling_ = is_backfilling; } + void ToTablePropertiesPB(TablePropertiesPB *pb) const; static TableProperties FromTablePropertiesPB(const TablePropertiesPB& pb); @@ -469,6 +473,7 @@ class TableProperties { int64_t default_time_to_live_ = kNoDefaultTtl; bool contain_counters_ = false; bool is_transactional_ = false; + bool is_backfilling_ = false; YBConsistencyLevel consistency_level_ = YBConsistencyLevel::STRONG; TableId copartition_table_id_ = kNoCopartitionTableId; boost::optional wal_retention_secs_; @@ -674,6 +679,8 @@ class Schema { table_properties_.SetTransactional(is_transactional); } + void SetIsBackfilling(bool is_backfilling) { table_properties_.SetIsBackfilling(is_backfilling); } + // Return the column index corresponding to the given column, // or kColumnNotFound if the column is not in this schema. int find_column(const GStringPiece col_name) const { @@ -788,7 +795,7 @@ class Schema { // in a way suitable for debugging. This isn't currently optimized // so should be avoided in hot paths. template - string DebugRow(const RowType& row) const { + std::string DebugRow(const RowType& row) const { DCHECK_SCHEMA_EQ(*this, *row.schema()); return DebugRowColumns(row, num_columns()); } @@ -797,7 +804,7 @@ class Schema { // key-compatible with this one. Per above, this is not for use in // hot paths. template - string DebugRowKey(const RowType& row) const { + std::string DebugRowKey(const RowType& row) const { DCHECK_KEY_PROJECTION_SCHEMA_EQ(*this, *row.schema()); return DebugRowColumns(row, num_key_columns()); } @@ -822,7 +829,7 @@ class Schema { START_KEY, END_KEY }; - string DebugEncodedRowKey(Slice encoded_key, StartOrEnd start_or_end) const; + std::string DebugEncodedRowKey(Slice encoded_key, StartOrEnd start_or_end) const; // Compare two rows of this schema. template @@ -903,7 +910,7 @@ class Schema { // Stringify this Schema. This is not particularly efficient, // so should only be used when necessary for output. - string ToString() const; + std::string ToString() const; // Return true if the schemas have exactly the same set of columns // and respective types. @@ -1029,7 +1036,7 @@ class Schema { // row. template std::string DebugRowColumns(const RowType& row, int num_columns) const { - string ret; + std::string ret; ret.append("("); for (size_t col_idx = 0; col_idx < num_columns; col_idx++) { @@ -1127,37 +1134,37 @@ class SchemaBuilder { // assumes type is allowed in primary key -- this should be checked before getting here // using DataType (not QLType) since primary key columns only support elementary types - CHECKED_STATUS AddKeyColumn(const string& name, const std::shared_ptr& type); - CHECKED_STATUS AddKeyColumn(const string& name, DataType type); + CHECKED_STATUS AddKeyColumn(const std::string& name, const std::shared_ptr& type); + CHECKED_STATUS AddKeyColumn(const std::string& name, DataType type); // assumes type is allowed in hash key -- this should be checked before getting here // using DataType (not QLType) since hash key columns only support elementary types - CHECKED_STATUS AddHashKeyColumn(const string& name, const std::shared_ptr& type); - CHECKED_STATUS AddHashKeyColumn(const string& name, DataType type); + CHECKED_STATUS AddHashKeyColumn(const std::string& name, const std::shared_ptr& type); + CHECKED_STATUS AddHashKeyColumn(const std::string& name, DataType type); CHECKED_STATUS AddColumn(const ColumnSchema& column, bool is_key); - CHECKED_STATUS AddColumn(const string& name, const std::shared_ptr& type) { + CHECKED_STATUS AddColumn(const std::string& name, const std::shared_ptr& type) { return AddColumn(name, type, false, false, false, false, 0, ColumnSchema::SortingType::kNotSpecified); } // convenience function for adding columns with simple (non-parametric) data types - CHECKED_STATUS AddColumn(const string& name, DataType type) { + CHECKED_STATUS AddColumn(const std::string& name, DataType type) { return AddColumn(name, QLType::Create(type)); } - CHECKED_STATUS AddNullableColumn(const string& name, const std::shared_ptr& type) { + CHECKED_STATUS AddNullableColumn(const std::string& name, const std::shared_ptr& type) { return AddColumn(name, type, true, false, false, false, 0, ColumnSchema::SortingType::kNotSpecified); } // convenience function for adding columns with simple (non-parametric) data types - CHECKED_STATUS AddNullableColumn(const string& name, DataType type) { + CHECKED_STATUS AddNullableColumn(const std::string& name, DataType type) { return AddNullableColumn(name, QLType::Create(type)); } - CHECKED_STATUS AddColumn(const string& name, + CHECKED_STATUS AddColumn(const std::string& name, const std::shared_ptr& type, bool is_nullable, bool is_hash_key, @@ -1167,7 +1174,7 @@ class SchemaBuilder { yb::ColumnSchema::SortingType sorting_type); // convenience function for adding columns with simple (non-parametric) data types - CHECKED_STATUS AddColumn(const string& name, + CHECKED_STATUS AddColumn(const std::string& name, DataType type, bool is_nullable, bool is_hash_key, @@ -1179,8 +1186,8 @@ class SchemaBuilder { order, sorting_type); } - CHECKED_STATUS RemoveColumn(const string& name); - CHECKED_STATUS RenameColumn(const string& old_name, const string& new_name); + CHECKED_STATUS RemoveColumn(const std::string& name); + CHECKED_STATUS RenameColumn(const std::string& old_name, const std::string& new_name); CHECKED_STATUS AlterProperties(const TablePropertiesPB& pb); private: diff --git a/src/yb/common/types.h b/src/yb/common/types.h index c7daaafb3949..ce77eb9e8f62 100644 --- a/src/yb/common/types.h +++ b/src/yb/common/types.h @@ -53,7 +53,6 @@ namespace yb { // type we support. const int kLargestTypeSize = sizeof(Slice); -using std::string; class TypeInfo; // This is the important bit of this header: @@ -68,9 +67,9 @@ class TypeInfo { DataType type() const { return type_; } // Returns the type used to actually store the data. DataType physical_type() const { return physical_type_; } - const string& name() const { return name_; } + const std::string& name() const { return name_; } const size_t size() const { return size_; } - void AppendDebugStringForValue(const void *ptr, string *str) const; + void AppendDebugStringForValue(const void *ptr, std::string *str) const; int Compare(const void *lhs, const void *rhs) const; void CopyMinValue(void* dst) const { memcpy(dst, min_value_, size_); @@ -82,11 +81,11 @@ class TypeInfo { const DataType type_; const DataType physical_type_; - const string name_; + const std::string name_; const size_t size_; const void* const min_value_; - typedef void (*AppendDebugFunc)(const void *, string *); + typedef void (*AppendDebugFunc)(const void *, std::string *); const AppendDebugFunc append_func_; typedef int (*CompareFunc)(const void *, const void *); @@ -116,7 +115,7 @@ struct DataTypeTraits { static const char *name() { return "uint8"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleItoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -134,7 +133,7 @@ struct DataTypeTraits { static const char *name() { return "int8"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleItoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -152,7 +151,7 @@ struct DataTypeTraits { static const char *name() { return "uint16"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleItoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -170,7 +169,7 @@ struct DataTypeTraits { static const char *name() { return "int16"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleItoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -188,7 +187,7 @@ struct DataTypeTraits { static const char *name() { return "uint32"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleItoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -206,7 +205,7 @@ struct DataTypeTraits { static const char *name() { return "int32"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleItoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -224,7 +223,7 @@ struct DataTypeTraits { static const char *name() { return "uint64"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleItoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -242,7 +241,7 @@ struct DataTypeTraits { static const char *name() { return "int64"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleItoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -260,7 +259,7 @@ struct DataTypeTraits { static const char *name() { return "float"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleFtoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -278,7 +277,7 @@ struct DataTypeTraits { static const char *name() { return "double"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { str->append(SimpleDtoa(*reinterpret_cast(val))); } static int Compare(const void *lhs, const void *rhs) { @@ -296,7 +295,7 @@ struct DataTypeTraits { static const char *name() { return "binary"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { const Slice *s = reinterpret_cast(val); str->append(strings::CHexEscape(s->ToString())); } @@ -319,7 +318,7 @@ struct DataTypeTraits { static const char* name() { return "bool"; } - static void AppendDebugStringForValue(const void* val, string* str) { + static void AppendDebugStringForValue(const void* val, std::string* str) { str->append(*reinterpret_cast(val) ? "true" : "false"); } @@ -339,7 +338,7 @@ struct DerivedTypeTraits { typedef typename DataTypeTraits::cpp_type cpp_type; static const DataType physical_type = PhysicalType; - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { DataTypeTraits::AppendDebugStringForValue(val, str); } @@ -357,7 +356,7 @@ struct DataTypeTraits : public DerivedTypeTraits{ static const char* name() { return "string"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { const Slice *s = reinterpret_cast(val); str->append(strings::Utf8SafeCEscape(s->ToString())); } @@ -368,7 +367,7 @@ struct DataTypeTraits : public DerivedTypeTraits{ static const char* name() { return "inet"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { const Slice *s = reinterpret_cast(val); InetAddress addr; DCHECK(addr.FromSlice(*s).ok()); @@ -381,7 +380,7 @@ struct DataTypeTraits : public DerivedTypeTraits{ static const char* name() { return "jsonb"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { const Slice *s = reinterpret_cast(val); str->append(strings::Utf8SafeCEscape(s->ToString())); } @@ -392,7 +391,7 @@ struct DataTypeTraits : public DerivedTypeTraits{ static const char* name() { return "uuid"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { const Slice *s = reinterpret_cast(val); Uuid uuid; DCHECK(uuid.FromSlice(*s).ok()); @@ -405,7 +404,7 @@ struct DataTypeTraits : public DerivedTypeTraits{ static const char* name() { return "timeuuid"; } - static void AppendDebugStringForValue(const void *val, string *str) { + static void AppendDebugStringForValue(const void *val, std::string *str) { const Slice *s = reinterpret_cast(val); Uuid uuid; DCHECK(uuid.FromSlice(*s).ok()); @@ -473,7 +472,7 @@ struct DataTypeTraits : public DerivedTypeTraits{ static const char* name() { return "decimal"; } - static void AppendDebugDecimalForValue(const void* val, string* str) { + static void AppendDebugDecimalForValue(const void* val, std::string* str) { const Slice *s = reinterpret_cast(val); str->append(strings::Utf8SafeCEscape(s->ToString())); } @@ -484,7 +483,7 @@ struct DataTypeTraits : public DerivedTypeTraits{ static const char* name() { return "varint"; } - static void AppendDebugVarIntForValue(const void* val, string* str) { + static void AppendDebugVarIntForValue(const void* val, std::string* str) { const Slice *s = reinterpret_cast(val); str->append(strings::Utf8SafeCEscape(s->ToString())); } @@ -502,7 +501,7 @@ struct DataTypeTraits : public DerivedTypeTraits{ return "timestamp"; } - static void AppendDebugStringForValue(const void* val, string* str) { + static void AppendDebugStringForValue(const void* val, std::string* str) { int64_t timestamp_micros = *reinterpret_cast(val); time_t secs_since_epoch = timestamp_micros / US_TO_S; // If the time is negative we need to take into account that any microseconds @@ -648,7 +647,7 @@ class Variant { // Set the variant to a STRING type. // The specified data block will be copied, and released by the variant // on the next set/clear call. - void Reset(const string& data) { + void Reset(const std::string& data) { Slice slice(data); Reset(STRING, &slice); } diff --git a/src/yb/docdb/docdb_compaction_filter.cc b/src/yb/docdb/docdb_compaction_filter.cc index 5fafdcc9d2a0..0ba8d420bc81 100644 --- a/src/yb/docdb/docdb_compaction_filter.cc +++ b/src/yb/docdb/docdb_compaction_filter.cc @@ -256,7 +256,7 @@ Result DocDBCompactionFilter::DoFilter( if (has_expired) { // This is consistent with the condition we're testing for deletes at the bottom of the function // because ht_at_or_below_cutoff is implied by has_expired. - if (is_major_compaction_) { + if (is_major_compaction_ && !retention_.retain_delete_markers_in_major_compaction) { return FilterDecision::kDiscard; } @@ -284,12 +284,20 @@ Result DocDBCompactionFilter::DoFilter( within_merge_block_ = false; } + // If we are backfilling an index table, we want to preserve the delete markers in the table + // until the backfill process is completed. For other normal use cases, delete markers/tombstones + // can be cleaned up on a major compaction. + // retention_.retain_delete_markers_in_major_compaction will be set to true until the index + // backfill is complete. + // // Tombstones at or below the history cutoff hybrid_time can always be cleaned up on full (major) // compactions. However, we do need to update the overwrite hybrid time stack in this case (as we // just did), because this deletion (tombstone) entry might be the only reason for cleaning up // more entries appearing at earlier hybrid times. - return value_type == ValueType::kTombstone && is_major_compaction_ ? FilterDecision::kDiscard - : FilterDecision::kKeep; + return value_type == ValueType::kTombstone && is_major_compaction_ && + !retention_.retain_delete_markers_in_major_compaction + ? FilterDecision::kDiscard + : FilterDecision::kKeep; } void DocDBCompactionFilter::AssignPrevSubDocKey( @@ -336,11 +344,9 @@ const char* DocDBCompactionFilterFactory::Name() const { HistoryRetentionDirective ManualHistoryRetentionPolicy::GetRetentionDirective() { std::lock_guard lock(deleted_cols_mtx_); - return { - history_cutoff_.load(std::memory_order_acquire), - std::make_shared(deleted_cols_), - table_ttl_.load(std::memory_order_acquire) - }; + return {history_cutoff_.load(std::memory_order_acquire), + std::make_shared(deleted_cols_), table_ttl_.load(std::memory_order_acquire), + ShouldRetainDeleteMarkersInMajorCompaction::kFalse}; } void ManualHistoryRetentionPolicy::SetHistoryCutoff(HybridTime history_cutoff) { diff --git a/src/yb/docdb/docdb_compaction_filter.h b/src/yb/docdb/docdb_compaction_filter.h index 017abe8116a7..ec45458593cb 100644 --- a/src/yb/docdb/docdb_compaction_filter.h +++ b/src/yb/docdb/docdb_compaction_filter.h @@ -36,6 +36,7 @@ namespace yb { namespace docdb { YB_STRONGLY_TYPED_BOOL(IsMajorCompaction); +YB_STRONGLY_TYPED_BOOL(ShouldRetainDeleteMarkersInMajorCompaction); struct Expiration; @@ -52,6 +53,8 @@ struct HistoryRetentionDirective { ColumnIdsPtr deleted_cols; MonoDelta table_ttl; + + ShouldRetainDeleteMarkersInMajorCompaction retain_delete_markers_in_major_compaction{false}; }; // DocDB compaction filter. A new instance of this class is created for every compaction. diff --git a/src/yb/integration-tests/cassandra_cpp_driver-test.cc b/src/yb/integration-tests/cassandra_cpp_driver-test.cc index 66afee659978..dea3a048e036 100644 --- a/src/yb/integration-tests/cassandra_cpp_driver-test.cc +++ b/src/yb/integration-tests/cassandra_cpp_driver-test.cc @@ -15,12 +15,19 @@ #include +#include "yb/client/client-internal.h" +#include "yb/client/client-test-util.h" +#include "yb/client/client.h" +#include "yb/client/table_alterer.h" +#include "yb/client/table_creator.h" +#include "yb/client/table_handle.h" #include "yb/gutil/strings/join.h" #include "yb/gutil/strings/strip.h" #include "yb/gutil/strings/substitute.h" #include "yb/integration-tests/external_mini_cluster-itest-base.h" -#include "yb/util/metrics.h" +#include "yb/util/backoff_waiter.h" #include "yb/util/jsonreader.h" +#include "yb/util/metrics.h" #include "yb/util/random_util.h" #include "yb/util/size_literals.h" @@ -36,6 +43,11 @@ using std::get; using rapidjson::Value; using strings::Substitute; +using yb::client::YBTableName; +using yb::client::YBTableInfo; +using yb::CoarseBackoffWaiter; +using yb::YQLDatabase; + METRIC_DECLARE_entity(server); METRIC_DECLARE_histogram(handler_latency_yb_client_write_remote); METRIC_DECLARE_histogram(handler_latency_yb_client_read_remote); @@ -327,11 +339,26 @@ class CassandraSession { return future.Result(); } + CassandraFuture ExecuteGetFuture(const CassandraStatement& statement) { + return CassandraFuture( + cass_session_execute(cass_session_.get(), statement.cass_statement_.get())); + } + + CassandraFuture ExecuteGetFuture(const string& query) { + LOG(INFO) << "Execute query: " << query; + return ExecuteGetFuture(CassandraStatement(query)); + } + CHECKED_STATUS ExecuteQuery(const string& query) { LOG(INFO) << "Execute query: " << query; return Execute(CassandraStatement(query)); } + Result ExecuteWithResult(const string& query) { + LOG(INFO) << "Execute query: " << query; + return Execute(CassandraStatement(query)); + } + CHECKED_STATUS ExecuteBatch(const CassandraBatch& batch) { return SubmitBatch(batch).Wait(); } @@ -426,7 +453,7 @@ class CppCassandraDriverTest : public ExternalMiniClusterITestBase { LOG(INFO) << "Starting YB ExternalMiniCluster..."; // Start up with 3 (default) tablet servers. - ASSERT_NO_FATALS(StartCluster(ExtraTServerFlags(), {}, 3, NumMasters())); + ASSERT_NO_FATALS(StartCluster(ExtraTServerFlags(), ExtraMasterFlags(), 3, NumMasters())); driver_.reset(CHECK_NOTNULL(new CppCassandraDriver(*cluster_, UsePartitionAwareRouting()))); @@ -455,6 +482,10 @@ class CppCassandraDriverTest : public ExternalMiniClusterITestBase { return {}; } + virtual std::vector ExtraMasterFlags() { + return {}; + } + virtual int NumMasters() { return 1; } @@ -468,6 +499,19 @@ class CppCassandraDriverTest : public ExternalMiniClusterITestBase { CassandraSession session_; }; +class CppCassandraDriverTestIndex : public CppCassandraDriverTest { + public: + virtual std::vector ExtraTServerFlags() { + return {"--allow_index_table_read_write=true"}; + } + + virtual std::vector ExtraMasterFlags() { + return {"--enable_load_balancing=false", + "--disable_index_backfill=false", + "--TEST_slowdown_backfill_alter_table_rpcs_ms=500"}; + } +}; + //------------------------------------------------------------------------------ struct cass_json_t { @@ -1083,6 +1127,73 @@ TEST_F(CppCassandraDriverTest, TestLongJson) { } } +Result GetIndexPermissions( + client::YBClient* client, const YBTableName& table_name, const YBTableName& index_table_name) { + Result table_info = client->GetYBTableInfo(table_name); + if (!table_info) { + RETURN_NOT_OK_PREPEND(table_info.status(), + "Unable to fetch table info for the main table " + + table_name.ToString()); + } + Result index_table_info = client->GetYBTableInfo(index_table_name); + if (!index_table_info) { + RETURN_NOT_OK_PREPEND(index_table_info.status(), + "Unable to fetch table info for the index table " + index_table_name.ToString()); + } + + IndexInfoPB index_info_pb; + table_info->index_map[index_table_info->table_id].ToPB(&index_info_pb); + LOG(INFO) << "The index info for " << index_table_name.ToString() << " is " + << yb::ToString(index_info_pb); + + if (!index_info_pb.has_index_permissions()) { + return STATUS(NotFound, "IndexPermissions not found in index info."); + } + + return index_info_pb.index_permissions(); +} + +IndexPermissions WaitUntilIndexPermissionIsAtLeast( + client::YBClient* client, const YBTableName& table_name, const YBTableName& index_table_name, + IndexPermissions min_permission, bool exponential_backoff = true) { + CoarseBackoffWaiter waiter(CoarseMonoClock::Now() + 90s, + (exponential_backoff ? CoarseMonoClock::Duration::max() : 50ms)); + Result result = GetIndexPermissions(client, table_name, index_table_name); + while (!result || *result < min_permission) { + LOG(INFO) << "Waiting since GetIndexPermissions returned " << result.ToString(); + waiter.Wait(); + result = GetIndexPermissions(client, table_name, index_table_name); + } + return *result; +} + +TEST_F_EX(CppCassandraDriverTest, TestCreateIndex, CppCassandraDriverTestIndex) { + ASSERT_OK( + session_.ExecuteQuery("create table test_table (k int primary key, v text) " + "with transactions = {'enabled' : true};")); + + LOG(INFO) << "Inserting one row"; + ASSERT_OK(session_.ExecuteQuery("insert into test_table (k, v) values (1, 'one');")); + LOG(INFO) << "Creating index"; + ASSERT_OK(session_.ExecuteQuery("create index test_table_index_by_v on test_table (v);")); + + LOG(INFO) << "Inserting one row"; + ASSERT_OK(session_.ExecuteQuery("insert into test_table (k, v) values (2, 'two');")); + ASSERT_OK(session_.ExecuteQuery("insert into test_table (k, v) values (3, 'three');")); + + constexpr auto kNamespace = "test"; + const YBTableName table_name(YQL_DATABASE_CQL, kNamespace, "test_table"); + const YBTableName index_table_name(YQL_DATABASE_CQL, kNamespace, "test_table_index_by_v"); + IndexPermissions perm = WaitUntilIndexPermissionIsAtLeast( + client_.get(), table_name, index_table_name, + IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE); + ASSERT_TRUE(perm == IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE); + + LOG(INFO) << "Selecting one row from the main table"; + ASSERT_OK(session_.ExecuteQuery("select * from test_table where v = 'one';")); + LOG(INFO) << "Done selecting row(s) from the main table."; +} + TEST_F(CppCassandraDriverTest, TestPrepare) { typedef TestTable MyTable; MyTable table; diff --git a/src/yb/integration-tests/master_failover-itest.cc b/src/yb/integration-tests/master_failover-itest.cc index 05216064a5cc..67093ab52a9b 100644 --- a/src/yb/integration-tests/master_failover-itest.cc +++ b/src/yb/integration-tests/master_failover-itest.cc @@ -35,13 +35,15 @@ #include #include -#include "yb/client/client.h" #include "yb/client/client-internal.h" +#include "yb/client/client-test-util.h" +#include "yb/client/client.h" #include "yb/client/table_alterer.h" #include "yb/client/table_creator.h" #include "yb/client/table_handle.h" #include "yb/common/schema.h" +#include "yb/common/wire_protocol-test-util.h" #include "yb/gutil/strings/substitute.h" #include "yb/gutil/strings/util.h" #include "yb/integration-tests/external_mini_cluster.h" @@ -51,6 +53,8 @@ using namespace std::literals; +DECLARE_int32(yb_num_total_tablets); + namespace yb { // Note: this test needs to be in the client namespace in order for @@ -113,17 +117,45 @@ class MasterFailoverTest : public YBTest { } Status CreateTable(const YBTableName& table_name, CreateTableMode mode) { - YBSchema schema; - YBSchemaBuilder b; - b.AddColumn("key")->Type(INT32)->NotNull()->PrimaryKey(); - b.AddColumn("int_val")->Type(INT32)->NotNull(); - b.AddColumn("string_val")->Type(STRING)->NotNull(); - CHECK_OK(b.Build(&schema)); + RETURN_NOT_OK_PREPEND( + client_->CreateNamespaceIfNotExists(table_name.namespace_name()), + "Unable to create namespace " + table_name.namespace_name()); + client::YBSchema client_schema(client::YBSchemaFromSchema(yb::GetSimpleTestSchema())); std::unique_ptr table_creator(client_->NewTableCreator()); return table_creator->table_name(table_name) - .schema(&schema) + .table_type(YBTableType::YQL_TABLE_TYPE) + .schema(&client_schema) + .hash_schema(YBHashSchema::kMultiColumnHash) + .timeout(MonoDelta::FromSeconds(90)) + .wait(mode == kWaitForCreate) + .Create(); + } + + Status CreateIndex( + const YBTableName& indexed_table_name, const YBTableName& index_name, CreateTableMode mode) { + RETURN_NOT_OK_PREPEND( + client_->CreateNamespaceIfNotExists(index_name.namespace_name()), + "Unable to create namespace " + index_name.namespace_name()); + client::YBSchema client_schema(client::YBSchemaFromSchema(yb::GetSimpleTestSchema())); + client::TableHandle table; + RETURN_NOT_OK_PREPEND( + table.Open(indexed_table_name, client_.get()), + "Unable to open table " + indexed_table_name.ToString()); + + std::unique_ptr table_creator(client_->NewTableCreator()); + return table_creator->table_name(index_name) + .table_type(YBTableType::YQL_TABLE_TYPE) + .indexed_table_id(table->id()) + .schema(&client_schema) + .hash_schema(YBHashSchema::kMultiColumnHash) .timeout(MonoDelta::FromSeconds(90)) .wait(mode == kWaitForCreate) + // In the new style create index request, the CQL proxy populates the + // index info instead of the master. However, in these tests we bypass + // the proxy and go directly to the master. We need to use the old + // style create request to have the master generate the appropriate + // index info. + .TEST_use_old_style_create_request() .Create(); } @@ -161,6 +193,19 @@ class MasterFailoverTest : public YBTest { std::unique_ptr client_; }; +class MasterFailoverTestIndexCreation : public MasterFailoverTest { + public: + MasterFailoverTestIndexCreation() { + opts_.extra_tserver_flags.push_back("--allow_index_table_read_write=true"); + opts_.extra_master_flags.push_back("--TEST_slowdown_backfill_alter_table_rpcs_ms=50"); + opts_.extra_master_flags.push_back("--disable_index_backfill=false"); + // Sometimes during master failover we have the create index kick in before the tservers have + // checked in. By default we wait for enough TSs -- else we fail the create table/idx request. + // We don't have to wait for that in the tests here. + opts_.extra_master_flags.push_back("--catalog_manager_check_ts_count_for_create_table=false"); + } +}; + // Test that synchronous CreateTable (issue CreateTable call and then // wait until the table has been created) works even when the original // leader master has been paused. @@ -175,7 +220,7 @@ TEST_F(MasterFailoverTest, DISABLED_TestCreateTableSync) { return; } - int leader_idx; + int leader_idx = -1; ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); LOG(INFO) << "Pausing leader master"; @@ -199,7 +244,7 @@ TEST_F(MasterFailoverTest, DISABLED_TestPauseAfterCreateTableIssued) { return; } - int leader_idx; + int leader_idx = -1; ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); YBTableName table_name(YQL_DATABASE_CQL, "testPauseAfterCreateTableIssued"); @@ -217,6 +262,87 @@ TEST_F(MasterFailoverTest, DISABLED_TestPauseAfterCreateTableIssued) { ASSERT_OK(OpenTableAndScanner(table_name)); } +// Orchestrate a master failover at various points of a backfill, +// ensure that the backfill eventually completes. +TEST_F(MasterFailoverTestIndexCreation, TestPauseAfterCreateIndexIssued) { + YBTableName table_name(YQL_DATABASE_CQL, "test", "testPauseAfterCreateTableIssued"); + LOG(INFO) << "Issuing CreateTable for " << table_name.ToString(); + FLAGS_yb_num_total_tablets = 5; + ASSERT_OK(CreateTable(table_name, kWaitForCreate)); + LOG(INFO) << "CreateTable done for " << table_name.ToString(); + + MonoDelta total_time_taken_for_one_iteration; + constexpr int kNumLoops = 10; + for (int i = 0; i < kNumLoops; i++) { + auto start = ToSteady(CoarseMonoClock::Now()); + int leader_idx = -1; + ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); + ScopedResumeExternalDaemon resume_daemon(cluster_->master(leader_idx)); + + consensus::OpId op_id; + ASSERT_OK(cluster_->GetLastOpIdForLeader(&op_id)); + ASSERT_OK(cluster_->WaitForMastersToCommitUpTo(static_cast(op_id.index()))); + + YBTableName index_table_name( + YQL_DATABASE_CQL, "test", "testPauseAfterCreateTableIssuedIdx" + yb::ToString(i)); + LOG(INFO) << "Issuing CreateIndex for " << index_table_name.ToString(); + ASSERT_OK(CreateIndex(table_name, index_table_name, kNoWaitForCreate)); + + if (i > 0) { + // In the first run, we estimate how long it takes for an uninterrupted + // backfill process to complete, then the remaining iterations kill the + // master leader at various points to cause the master failover during + // the various stages of index backfill. + MonoDelta sleep_time = total_time_taken_for_one_iteration * i / kNumLoops; + LOG(INFO) << "Sleeping for " << sleep_time << ", before master pause"; + SleepFor(sleep_time); + + LOG(INFO) << "Pausing leader master 0-based: " << leader_idx << " i.e. m-" + << 1 + leader_idx; + ASSERT_OK(cluster_->master(leader_idx)->Pause()); + } + + IndexInfoPB index_info_pb; + TableId index_table_id; + const auto deadline = CoarseMonoClock::Now() + 900s; + do { + ASSERT_OK(client_->data_->WaitForCreateTableToFinish( + client_.get(), index_table_name, "" /* table_id */, deadline)); + ASSERT_OK(client_->data_->WaitForCreateTableToFinish( + client_.get(), table_name, "" /* table_id */, deadline)); + + Result table_info = client_->GetYBTableInfo(table_name); + ASSERT_TRUE(table_info); + Result index_table_info = client_->GetYBTableInfo(index_table_name); + ASSERT_TRUE(index_table_info); + + index_table_id = index_table_info->table_id; + index_info_pb.Clear(); + table_info->index_map[index_table_id].ToPB(&index_info_pb); + YB_LOG_EVERY_N_SECS(INFO, 1) << "The index info for " + << index_table_name.ToString() << " is " + << yb::ToString(index_info_pb); + + ASSERT_TRUE(index_info_pb.has_index_permissions()); + } while (index_info_pb.index_permissions() != + IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE && + CoarseMonoClock::Now() < deadline); + + ASSERT_EQ( + index_info_pb.index_permissions(), IndexPermissions::INDEX_PERM_READ_WRITE_AND_DELETE); + + LOG(INFO) << "All good for iteration " << i; + ASSERT_OK(client_->DeleteIndexTable(index_table_name, nullptr, /* wait */ true)); + ASSERT_OK(client_->data_->WaitForDeleteTableToFinish( + client_.get(), index_table_id, deadline)); + + // For the first round we just simply calculate the time it takes + if (i == 0) { + total_time_taken_for_one_iteration = ToSteady(CoarseMonoClock::Now()) - start; + } + } +} + // Test the scenario where we create a table, pause the leader master, // and then issue the DeleteTable call: DeleteTable should go to the newly // elected leader master and succeed. @@ -226,11 +352,11 @@ TEST_F(MasterFailoverTest, TestDeleteTableSync) { return; } - int leader_idx; + int leader_idx = -1; ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); - YBTableName table_name(YQL_DATABASE_CQL, "testDeleteTableSync"); + YBTableName table_name(YQL_DATABASE_CQL, "test", "testDeleteTableSync"); ASSERT_OK(CreateTable(table_name, kWaitForCreate)); LOG(INFO) << "Pausing leader master"; @@ -256,18 +382,18 @@ TEST_F(MasterFailoverTest, TestRenameTableSync) { return; } - int leader_idx; + int leader_idx = -1; ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); - YBTableName table_name_orig(YQL_DATABASE_CQL, "testAlterTableSync"); + YBTableName table_name_orig(YQL_DATABASE_CQL, "test", "testAlterTableSync"); ASSERT_OK(CreateTable(table_name_orig, kWaitForCreate)); LOG(INFO) << "Pausing leader master"; ASSERT_OK(cluster_->master(leader_idx)->Pause()); ScopedResumeExternalDaemon resume_daemon(cluster_->master(leader_idx)); - YBTableName table_name_new(YQL_DATABASE_CQL, "testAlterTableSyncRenamed"); + YBTableName table_name_new(YQL_DATABASE_CQL, "test", "testAlterTableSyncRenamed"); ASSERT_OK(RenameTable(table_name_orig, table_name_new)); shared_ptr table; ASSERT_OK(client_->OpenTable(table_name_new, &table)); diff --git a/src/yb/master/CMakeLists.txt b/src/yb/master/CMakeLists.txt index 7f0a8a6fa1f1..16e4a12f177f 100644 --- a/src/yb/master/CMakeLists.txt +++ b/src/yb/master/CMakeLists.txt @@ -60,6 +60,7 @@ set(MASTER_SRCS async_flush_tablets_task.cc async_rpc_tasks.cc call_home.cc + backfill_index.cc catalog_manager.cc catalog_manager_util.cc catalog_entity_info.cc diff --git a/src/yb/master/async_rpc_tasks.cc b/src/yb/master/async_rpc_tasks.cc index ebbea0df7227..e981d19c4c6d 100644 --- a/src/yb/master/async_rpc_tasks.cc +++ b/src/yb/master/async_rpc_tasks.cc @@ -43,6 +43,10 @@ DEFINE_int32(unresponsive_ts_rpc_retry_limit, 20, "to perform operations such as deleting a tablet."); TAG_FLAG(unresponsive_ts_rpc_retry_limit, advanced); +DEFINE_int32(retrying_ts_rpc_max_delay_ms, 60 * 1000, + "Maximum delay between successive attempts to contact an unresponsive tablet server"); +TAG_FLAG(retrying_ts_rpc_max_delay_ms, advanced); + DEFINE_test_flag( int32, slowdown_master_async_rpc_tasks_by_ms, 0, "For testing purposes, slow down the run method to take longer."); @@ -145,9 +149,7 @@ Status RetryingTSRpcTask::Run() { } // Calculate and set the timeout deadline. - MonoTime timeout = MonoTime::Now(); - timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms)); - const MonoTime& deadline = MonoTime::Earliest(timeout, deadline_); + const MonoTime deadline = ComputeDeadline(); rpc_.set_deadline(deadline); if (!PerformStateTransition(MonitoredTaskState::kWaiting, MonitoredTaskState::kRunning)) { @@ -177,6 +179,12 @@ Status RetryingTSRpcTask::Run() { return Status::OK(); } +MonoTime RetryingTSRpcTask::ComputeDeadline() { + MonoTime timeout = MonoTime::Now(); + timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms)); + return MonoTime::Earliest(timeout, deadline_); +} + // Abort this task and return its value before it was successfully aborted. If the task entered // a different terminal state before we were able to abort it, return that state. MonitoredTaskState RetryingTSRpcTask::AbortAndReturnPrevState() { @@ -225,6 +233,12 @@ void RetryingTSRpcTask::DoRpcCallback() { UnregisterAsyncTask(); // May call 'delete this'. } +int RetryingTSRpcTask::num_max_retries() { return FLAGS_unresponsive_ts_rpc_retry_limit; } + +int RetryingTSRpcTask::max_delay_ms() { + return FLAGS_retrying_ts_rpc_max_delay_ms; +} + bool RetryingTSRpcTask::RescheduleWithBackoffDelay() { auto task_state = state(); if (task_state != MonitoredTaskState::kRunning) { @@ -238,7 +252,7 @@ bool RetryingTSRpcTask::RescheduleWithBackoffDelay() { if (NoRetryTaskType()) { attempt_threshold = 0; } else if (RetryLimitTaskType()) { - attempt_threshold = FLAGS_unresponsive_ts_rpc_retry_limit; + attempt_threshold = num_max_retries(); } if (attempt_ > attempt_threshold) { @@ -258,7 +272,7 @@ bool RetryingTSRpcTask::RescheduleWithBackoffDelay() { if (attempt_ <= 12) { base_delay_ms = 1 << (attempt_ + 3); // 1st retry delayed 2^4 ms, 2nd 2^5, etc. } else { - base_delay_ms = 60 * 1000; // cap at 1 minute + base_delay_ms = max_delay_ms(); } // Normal rand is seeded by default with 1. Using the same for rand_r seed. unsigned int seed = 1; @@ -522,9 +536,7 @@ AsyncAlterTable::AsyncAlterTable(Master *master, tablet_(tablet) { } -string AsyncAlterTable::description() const { - return tablet_->ToString() + " Alter Table RPC"; -} +string AsyncAlterTable::description() const { return tablet_->ToString() + type_name() + " RPC"; } TabletId AsyncAlterTable::tablet_id() const { return tablet_->tablet_id(); @@ -543,33 +555,48 @@ void AsyncAlterTable::HandleResponse(int attempt) { case TabletServerErrorPB::TABLET_NOT_FOUND: case TabletServerErrorPB::MISMATCHED_SCHEMA: case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA: - LOG(WARNING) << "TS " << permanent_uuid() << ": alter failed for tablet " - << tablet_->ToString() << " no further retry: " << status.ToString(); + LOG(WARNING) << "TS " << permanent_uuid() << ": failed " + << description() << ". " << status.ToString() + << " for version " << schema_version_; TransitionToTerminalState(MonitoredTaskState::kRunning, MonitoredTaskState::kComplete); break; default: - LOG(WARNING) << "TS " << permanent_uuid() << ": alter failed for tablet " - << tablet_->ToString() << ": " << status.ToString(); + LOG(WARNING) << "TS " << permanent_uuid() << ": failed " + << description() << ". " << status.ToString() + << " for version " << schema_version_; break; } } else { TransitionToTerminalState(MonitoredTaskState::kRunning, MonitoredTaskState::kComplete); - VLOG(1) << "TS " << permanent_uuid() << ": alter complete on tablet " << tablet_->ToString(); + VLOG(1) << "TS " << permanent_uuid() << ": completed " << description() + << " for version " << schema_version_; } server::UpdateClock(resp_, master_->clock()); if (state() == MonitoredTaskState::kComplete) { // TODO: proper error handling here. - CHECK_OK(master_->catalog_manager()->HandleTabletSchemaVersionReport( + auto s = (master_->catalog_manager()->HandleTabletSchemaVersionReport( tablet_.get(), schema_version_)); + if (!s.ok()) { + LOG(FATAL) << "Check failed " << s << " failed for " + << tablet_->ToString() << " description " << description() + << " while running " + << "AsyncAlterTable::HandleResponse" + << " with response " << resp_.DebugString(); + } } else { - VLOG(1) << "Task is not completed"; + VLOG(1) << "Task is not completed" << tablet_->ToString() << " for version " + << schema_version_; } } bool AsyncAlterTable::SendRequest(int attempt) { + VLOG(1) << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id() + << "version " << schema_version_ << " waiting for a read lock."; auto l = table_->LockForRead(); + VLOG(1) << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id() + << "version " << schema_version_ << " obtained the read lock."; tserver::ChangeMetadataRequestPB req; req.set_schema_version(l->data().pb.version()); @@ -590,7 +617,29 @@ bool AsyncAlterTable::SendRequest(int attempt) { l->Unlock(); ts_admin_proxy_->AlterSchemaAsync(req, &resp_, &rpc_, BindRpcCallback()); - VLOG(1) << "Send alter table request to " << permanent_uuid() + VLOG(1) << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id() + << " (attempt " << attempt << "):\n" + << req.DebugString(); + return true; +} + +bool AsyncBackfillDone::SendRequest(int attempt) { + VLOG(1) << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id() + << "version " << schema_version_ << " waiting for a read lock."; + auto l = table_->LockForRead(); + VLOG(1) << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id() + << "version " << schema_version_ << " obtained the read lock."; + + tserver::ChangeMetadataRequestPB req; + req.set_dest_uuid(permanent_uuid()); + req.set_tablet_id(tablet_->tablet_id()); + req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64()); + req.set_is_backfilling(false); + schema_version_ = l->data().pb.version(); + l->Unlock(); + + ts_admin_proxy_->BackfillDoneAsync(req, &resp_, &rpc_, BindRpcCallback()); + VLOG(1) << "Send backfill done request to " << permanent_uuid() << " for " << tablet_->tablet_id() << " (attempt " << attempt << "):\n" << req.DebugString(); return true; diff --git a/src/yb/master/async_rpc_tasks.h b/src/yb/master/async_rpc_tasks.h index e0686c34c212..3105fc8f27cf 100644 --- a/src/yb/master/async_rpc_tasks.h +++ b/src/yb/master/async_rpc_tasks.h @@ -161,6 +161,7 @@ class RetryingTSRpcTask : public MonitoredTask { void AbortTask(); + virtual MonoTime ComputeDeadline(); // Callback meant to be invoked from asynchronous RPC service proxy calls. void RpcCallback(); @@ -225,6 +226,9 @@ class RetryingTSRpcTask : public MonitoredTask { // Only abort this task on reactor if it has been scheduled. void AbortIfScheduled(); + virtual int num_max_retries(); + virtual int max_delay_ms(); + // Use state() and MarkX() accessors. std::atomic state_; }; @@ -335,17 +339,33 @@ class AsyncAlterTable : public RetryingTSRpcTask { std::string description() const override; - private: + protected: + TabletServerId permanent_uuid() const; + + uint32_t schema_version_; + scoped_refptr tablet_; + TabletId tablet_id() const override; - TabletServerId permanent_uuid() const; + tserver::ChangeMetadataResponsePB resp_; + private: void HandleResponse(int attempt) override; bool SendRequest(int attempt) override; +}; - uint32_t schema_version_; - scoped_refptr tablet_; - tserver::ChangeMetadataResponsePB resp_; +class AsyncBackfillDone : public AsyncAlterTable { + public: + AsyncBackfillDone( + Master* master, ThreadPool* callback_pool, const scoped_refptr& tablet) + : AsyncAlterTable(master, callback_pool, tablet) {} + + Type type() const override { return ASYNC_BACKFILL_DONE; } + + std::string type_name() const override { return "Mark backfill done."; } + + private: + bool SendRequest(int attempt) override; }; class AsyncCopartitionTable : public RetryingTSRpcTask { diff --git a/src/yb/master/backfill_index.cc b/src/yb/master/backfill_index.cc new file mode 100644 index 000000000000..c212e59cdb33 --- /dev/null +++ b/src/yb/master/backfill_index.cc @@ -0,0 +1,774 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +#include "yb/master/backfill_index.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include "yb/common/common_flags.h" +#include "yb/common/partial_row.h" +#include "yb/common/partition.h" +#include "yb/common/roles_permissions.h" +#include "yb/common/wire_protocol.h" +#include "yb/consensus/consensus.h" +#include "yb/consensus/consensus.proxy.h" +#include "yb/consensus/consensus_peers.h" +#include "yb/consensus/quorum_util.h" +#include "yb/gutil/atomicops.h" +#include "yb/gutil/map-util.h" +#include "yb/gutil/mathlimits.h" +#include "yb/gutil/stl_util.h" +#include "yb/gutil/strings/escaping.h" +#include "yb/gutil/strings/join.h" +#include "yb/gutil/strings/substitute.h" +#include "yb/gutil/sysinfo.h" +#include "yb/gutil/walltime.h" +#include "yb/master/async_rpc_tasks.h" +#include "yb/master/catalog_loaders.h" +#include "yb/master/catalog_manager_bg_tasks.h" +#include "yb/master/catalog_manager_util.h" +#include "yb/master/cluster_balance.h" +#include "yb/master/master.h" +#include "yb/master/master.pb.h" +#include "yb/master/master.proxy.h" +#include "yb/master/master_util.h" +#include "yb/master/sys_catalog.h" +#include "yb/master/system_tablet.h" +#include "yb/master/tasks_tracker.h" +#include "yb/master/ts_descriptor.h" +#include "yb/master/ts_manager.h" +#include "yb/master/yql_aggregates_vtable.h" +#include "yb/master/yql_auth_resource_role_permissions_index.h" +#include "yb/master/yql_auth_role_permissions_vtable.h" +#include "yb/master/yql_auth_roles_vtable.h" +#include "yb/master/yql_columns_vtable.h" +#include "yb/master/yql_empty_vtable.h" +#include "yb/master/yql_functions_vtable.h" +#include "yb/master/yql_indexes_vtable.h" +#include "yb/master/yql_keyspaces_vtable.h" +#include "yb/master/yql_local_vtable.h" +#include "yb/master/yql_partitions_vtable.h" +#include "yb/master/yql_peers_vtable.h" +#include "yb/master/yql_size_estimates_vtable.h" +#include "yb/master/yql_tables_vtable.h" +#include "yb/master/yql_triggers_vtable.h" +#include "yb/master/yql_types_vtable.h" +#include "yb/master/yql_views_vtable.h" + +#include "yb/rpc/messenger.h" +#include "yb/tserver/ts_tablet_manager.h" + +#include "yb/tablet/operations/change_metadata_operation.h" +#include "yb/tablet/tablet.h" +#include "yb/tablet/tablet_metadata.h" + +#include "yb/tserver/tserver_admin.proxy.h" +#include "yb/yql/redis/redisserver/redis_constants.h" + +#include "yb/util/crypt.h" +#include "yb/util/debug-util.h" +#include "yb/util/debug/trace_event.h" +#include "yb/util/flag_tags.h" +#include "yb/util/logging.h" +#include "yb/util/math_util.h" +#include "yb/util/monotime.h" +#include "yb/util/random_util.h" +#include "yb/util/rw_mutex.h" +#include "yb/util/stopwatch.h" +#include "yb/util/thread.h" +#include "yb/util/thread_restrictions.h" +#include "yb/util/threadpool.h" +#include "yb/util/trace.h" +#include "yb/util/tsan_util.h" +#include "yb/util/uuid.h" + +#include "yb/client/client.h" +#include "yb/client/meta_cache.h" +#include "yb/client/table_creator.h" +#include "yb/client/table_handle.h" +#include "yb/client/yb_table_name.h" + +#include "yb/tserver/remote_bootstrap_client.h" + +DEFINE_int32(index_backfill_rpc_timeout_ms, 1 * 60 * 60 * 1000, // 1 hour. + "Timeout used by the master when attempting to backfilll a tablet " + "during index creation."); +TAG_FLAG(index_backfill_rpc_timeout_ms, advanced); +TAG_FLAG(index_backfill_rpc_timeout_ms, runtime); + +DEFINE_int32(index_backfill_rpc_max_retries, 150, + "Number of times to retry backfilling a tablet chunk " + "during index creation."); +TAG_FLAG(index_backfill_rpc_max_retries, advanced); +TAG_FLAG(index_backfill_rpc_max_retries, runtime); + +DEFINE_int32(index_backfill_rpc_max_delay_ms, 10 * 60 * 1000, // 10 min. + "Maximum delay before retrying a backfill tablet chunk request " + "during index creation."); +TAG_FLAG(index_backfill_rpc_max_delay_ms, advanced); +TAG_FLAG(index_backfill_rpc_max_delay_ms, runtime); + +DEFINE_int32(index_backfill_wait_for_alter_table_completion_ms, 100, + "Delay before retrying to see if an in-progress alter table has " + "completed, during index backfill."); +TAG_FLAG(index_backfill_wait_for_alter_table_completion_ms, advanced); +TAG_FLAG(index_backfill_wait_for_alter_table_completion_ms, runtime); + +DEFINE_test_flag(int32, TEST_slowdown_backfill_alter_table_rpcs_ms, 0, + "Slows down the send alter table rpc's so that the master may be stopped between " + "different phases."); + +namespace yb { +namespace master { + +using namespace std::literals; +using strings::Substitute; +using tserver::TabletServerErrorPB; + +Status MultiStageAlterTable::UpdateIndexPermission( + CatalogManager *catalog_manager, + const scoped_refptr& indexed_table, + const TableId& index_table_id, + IndexPermissions new_perm) { + DVLOG(3) << __PRETTY_FUNCTION__ << yb::ToString(*indexed_table); + if (FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms > 0) { + TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms); + DVLOG(3) << __PRETTY_FUNCTION__ << yb::ToString(*indexed_table) << " sleeping for " + << FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms + << "ms BEFORE updating the index permission to " << IndexPermissions_Name(new_perm); + SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms)); + DVLOG(3) << __PRETTY_FUNCTION__ << "Done Sleeping"; + TRACE("Done Sleeping"); + } + { + TRACE("Locking indexed table"); + auto l = indexed_table->LockForWrite(); + auto &indexed_table_data = *l->mutable_data(); + + indexed_table_data.pb.mutable_fully_applied_schema()->CopyFrom( + indexed_table_data.pb.schema()); + VLOG(1) << "Setting fully_applied_schema_version to " + << indexed_table_data.pb.version(); + indexed_table_data.pb.set_fully_applied_schema_version( + indexed_table_data.pb.version()); + indexed_table_data.pb.mutable_fully_applied_indexes()->CopyFrom( + indexed_table_data.pb.indexes()); + if (indexed_table_data.pb.has_index_info()) { + indexed_table_data.pb.mutable_fully_applied_index_info()->CopyFrom( + indexed_table_data.pb.index_info()); + } + + bool updated = false; + auto old_schema_version = indexed_table_data.pb.version(); + for (int i = 0; i < indexed_table_data.pb.indexes_size(); i++) { + IndexInfoPB *idx_pb = indexed_table_data.pb.mutable_indexes(i); + if (idx_pb->table_id() == index_table_id) { + IndexPermissions old_perm = idx_pb->index_permissions(); + if (old_perm == new_perm) { + LOG(INFO) << "The index permission" + << " for index table id : " << yb::ToString(index_table_id) + << " seems to have already been updated to " + << IndexPermissions_Name(new_perm) + << " by somebody else. This is OK."; + return STATUS_SUBSTITUTE( + AlreadyPresent, "IndexPermissions for $0 is already $1", index_table_id, new_perm); + } + idx_pb->set_index_permissions(new_perm); + VLOG(1) << "Updating index permissions " + << " from " << IndexPermissions_Name(old_perm) << " to " + << IndexPermissions_Name(new_perm) << " schema_version from " << old_schema_version + << " to " << old_schema_version + 1 << ". New index info would be " + << yb::ToString(idx_pb); + updated = true; + break; + } + } + + if (!updated) { + LOG(WARNING) << "Could not find the desired index " + << yb::ToString(index_table_id) + << " to update in the indexed_table " + << yb::ToString(indexed_table_data.pb) + << "\nThis may be OK, if the index was deleted."; + return STATUS_SUBSTITUTE( + InvalidArgument, "Could not find the desired index $0", index_table_id); + } + + VLOG(1) << "Before updating indexed_table_data.pb.version() is " + << indexed_table_data.pb.version(); + indexed_table_data.pb.set_version(indexed_table_data.pb.version() + 1); + VLOG(1) << "After updating indexed_table_data.pb.version() is " + << indexed_table_data.pb.version(); + indexed_table_data.set_state(SysTablesEntryPB::ALTERING, + Substitute("Alter table version=$0 ts=$1", + indexed_table_data.pb.version(), + LocalTimeAsString())); + + // Update sys-catalog with the new indexed table info. + TRACE("Updating indexed table metadata on disk"); + RETURN_NOT_OK(catalog_manager->sys_catalog_->UpdateItem( + indexed_table.get(), catalog_manager->leader_ready_term_)); + + // Update the in-memory state. + TRACE("Committing in-memory state"); + l->Commit(); + } + if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms > 0)) { + TRACE("Sleeping for $0 ms", + FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms); + DVLOG(3) << __PRETTY_FUNCTION__ << yb::ToString(*indexed_table) << " sleeping for " + << FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms + << "ms AFTER updating the index permission to " << IndexPermissions_Name(new_perm); + SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms)); + DVLOG(3) << __PRETTY_FUNCTION__ << "Done Sleeping"; + TRACE("Done Sleeping"); + } + return Status::OK(); +} + +void MultiStageAlterTable::StartBackfillingData( + CatalogManager *catalog_manager, + const scoped_refptr &indexed_table, const IndexInfoPB index_pb) { + if (!indexed_table->IsBackfilling()) { + VLOG(1) << __func__ << " starting backfill for " + << indexed_table->ToString(); + indexed_table->SetIsBackfilling(true); + auto backfill_table = std::make_shared( + catalog_manager->master_, catalog_manager->worker_pool_.get(), + indexed_table, std::vector{index_pb}); + backfill_table->Launch(); + } else { + LOG(WARNING) << __func__ << " Not starting backfill for " + << indexed_table->ToString() << " one is already in progress"; + } +} + +bool MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( + CatalogManager* catalog_manager, const scoped_refptr& indexed_table) { + DVLOG(3) << __PRETTY_FUNCTION__ << yb::ToString(*indexed_table); + + // Add index info to indexed table and increment schema version. + bool updated = false; + IndexInfoPB index_info_to_update; + { + TRACE("Locking indexed table"); + VLOG(1) << ("Locking indexed table"); + auto l = indexed_table->LockForRead(); + for (int i = 0; i < l->data().pb.indexes_size(); i++) { + const IndexInfoPB& idx_pb = l->data().pb.indexes(i); + if (idx_pb.has_index_permissions() && + idx_pb.index_permissions() < INDEX_PERM_READ_WRITE_AND_DELETE) { + index_info_to_update = idx_pb; + // Until we get to #2784, we'll have only one index being built at a time. + LOG_IF(DFATAL, updated) << "For now, we cannot have multiple indices build in parallel."; + updated = true; + } + } + } + + if (!updated) { + TRACE("Not necessary to launch next version"); + VLOG(1) << "Not necessary to launch next version : " << yb::ToString(index_info_to_update); + return false; + } + + const IndexPermissions old_perm = index_info_to_update.index_permissions(); + if (old_perm == INDEX_PERM_DELETE_ONLY || old_perm == INDEX_PERM_WRITE_AND_DELETE) { + const IndexPermissions new_perm = + (old_perm == INDEX_PERM_DELETE_ONLY ? INDEX_PERM_WRITE_AND_DELETE : INDEX_PERM_DO_BACKFILL); + if (UpdateIndexPermission( + catalog_manager, indexed_table, index_info_to_update.table_id(), new_perm) + .ok()) { + catalog_manager->SendAlterTableRequest(indexed_table); + } + } else { + LOG_IF(DFATAL, old_perm != INDEX_PERM_DO_BACKFILL) + << "Expect the old permission to be " + << "INDEX_PERM_DO_BACKFILL found " << IndexPermissions_Name(old_perm) + << " instead."; + TRACE("Starting backfill process"); + VLOG(1) << ("Starting backfill process"); + StartBackfillingData(catalog_manager, indexed_table.get(), index_info_to_update); + } + return true; +} + +// ----------------------------------------------------------------------------------------------- +// BackfillTable +// ----------------------------------------------------------------------------------------------- +void BackfillTable::Launch() { + LaunchComputeSafeTimeForRead(); +} + +void BackfillTable::LaunchComputeSafeTimeForRead() { + vector> tablets; + indexed_table_->GetAllTablets(&tablets); + + done_.store(false, std::memory_order_release); + tablets_pending_.store(tablets.size(), std::memory_order_release); + for (const scoped_refptr& tablet : tablets) { + auto get_safetime = std::make_shared(shared_from_this(), tablet); + get_safetime->Launch(); + } +} + +std::string BackfillTable::LogPrefix() const { + const TableId &index_table_id = indices()[0].table_id(); + return Format("Backfill Index Table(s) <$0>", index_table_id); +} + +void BackfillTable::UpdateSafeTime(const Status& s, HybridTime ht) { + if (!s.ok()) { + // Move on to ABORTED permission. + LOG_WITH_PREFIX(ERROR) + << "Failed backfill. Could not compute safe time for " + << yb::ToString(indexed_table_) << s; + if (!done_.exchange(true)) { + AlterTableStateToAbort(); + } else { + LOG_WITH_PREFIX(INFO) + << "Somebody else already aborted the index backfill."; + } + return; + } + + // Need to guard this. + { + std::lock_guard l(mutex_); + VLOG(2) << " Updating read_time_for_backfill_ to min{ " << read_time_for_backfill_.ToString() + << ", " << ht.ToString() << " }."; + read_time_for_backfill_.MakeAtMost(ht); + } + + // If OK then move on to READ permissions. + if (!done() && --tablets_pending_ == 0) { + { + std::lock_guard l(mutex_); + LOG_WITH_PREFIX(INFO) << "Completed fetching SafeTime for the table " + << yb::ToString(indexed_table_) << " will be using " + << read_time_for_backfill_.ToString(); + } + LaunchBackfill(); + } +} + +void BackfillTable::LaunchBackfill() { + vector> tablets; + indexed_table_->GetAllTablets(&tablets); + + done_.store(false, std::memory_order_release); + // TODO: Keep track of explicit tablet names to handle RPC retries. + tablets_pending_.store(tablets.size(), std::memory_order_release); + for (const scoped_refptr& tablet : tablets) { + auto backfill_tablet = std::make_shared(shared_from_this(), tablet); + backfill_tablet->Launch(); + } +} + +void BackfillTable::Done(const Status& s) { + if (!s.ok()) { + // Move on to ABORTED permission. + LOG_WITH_PREFIX(ERROR) << "Failed to backfill the index " << s; + if (!done_.exchange(true)) { + AlterTableStateToAbort(); + } else { + LOG_WITH_PREFIX(INFO) + << "Some body else already aborted the index backfill."; + } + return; + } + + // If OK then move on to READ permissions. + if (!done() && --tablets_pending_ == 0) { + LOG_WITH_PREFIX(INFO) << "Completed backfilling the index table."; + done_.store(true, std::memory_order_release); + AlterTableStateToSuccess(); + } +} + +void BackfillTable::AlterTableStateToSuccess() { + const TableId& index_table_id = indices()[0].table_id(); + if (MultiStageAlterTable::UpdateIndexPermission( + master_->catalog_manager(), indexed_table_, index_table_id, + INDEX_PERM_READ_WRITE_AND_DELETE).ok()) { + VLOG(1) << "Sending alter table requests to the Indexed table"; + master_->catalog_manager()->SendAlterTableRequest(indexed_table_); + VLOG(1) << "DONE Sending alter table requests to the Indexed table"; + AllowCompactionsToGCDeleteMarkers(index_table_id); + } + indexed_table_->SetIsBackfilling(false); + ClearCheckpointStateInTablets(); +} + +void BackfillTable::AlterTableStateToAbort() { + const TableId& index_table_id = indices()[0].table_id(); + MultiStageAlterTable::UpdateIndexPermission( + master_->catalog_manager(), indexed_table_, index_table_id, INDEX_PERM_BACKFILL_FAILED); + master_->catalog_manager()->SendAlterTableRequest(indexed_table_); + indexed_table_->SetIsBackfilling(false); + ClearCheckpointStateInTablets(); +} + +void BackfillTable::ClearCheckpointStateInTablets() { + vector> tablets; + indexed_table_->GetAllTablets(&tablets); + std::vector tablet_ptrs; + for (scoped_refptr& tablet : tablets) { + tablet_ptrs.push_back(tablet.get()); + tablet->mutable_metadata()->StartMutation(); + tablet->mutable_metadata()->mutable_dirty()->pb.clear_backfilled_until(); + } + auto term = master()->catalog_manager()->leader_ready_term(); + WARN_NOT_OK( + master()->catalog_manager()->sys_catalog()->UpdateItems(tablet_ptrs, term), + "Could not persist that the table is done backfilling."); + for (scoped_refptr& tablet : tablets) { + VLOG(2) << "Done backfilling the table. " << yb::ToString(tablet) + << " clearing backfilled_until"; + tablet->mutable_metadata()->CommitMutation(); + } +} + +void BackfillTable::AllowCompactionsToGCDeleteMarkers(const TableId& index_table_id) { + DVLOG(3) << __PRETTY_FUNCTION__; + scoped_refptr index_table_info; + TableIdentifierPB index_table_id_pb; + index_table_id_pb.set_table_id(index_table_id); + { + if (!master_->catalog_manager()->FindTable(index_table_id_pb, &index_table_info).ok()) { + LOG_WITH_PREFIX(ERROR) + << "Could not find table info for the index table " + << yb::ToString(index_table_id) << " to enable compactions. " + << "This is ok in case somebody issued a delete index."; + return; + } + } + // Add a sleep here to wait until the Table is fully created. + bool is_ready = false; + bool first_run = true; + do { + if (!first_run) { + YB_LOG_EVERY_N_SECS(INFO, 1) << "Waiting for the previous alter table to " + "complete on the index table " + << yb::ToString(index_table_id); + SleepFor( + MonoDelta::FromMilliseconds(FLAGS_index_backfill_wait_for_alter_table_completion_ms)); + } + first_run = false; + { + VLOG(2) << __func__ << ": Trying to lock index table for Read"; + auto l = index_table_info->LockForRead(); + is_ready = (l->data().pb.state() == SysTablesEntryPB::RUNNING); + } + VLOG(2) << __func__ << ": Unlocked index table for Read"; + } while (!is_ready); + { + TRACE("Locking index table"); + VLOG(2) << __func__ << ": Trying to lock index table for Write"; + auto l = index_table_info->LockForWrite(); + VLOG(2) << __func__ << ": locked index table for Write"; + l->mutable_data()->pb.mutable_schema()->mutable_table_properties()->set_is_backfilling(false); + + // Update sys-catalog with the new indexed table info. + TRACE("Updating index table metadata on disk"); + const auto leader_term = master_->catalog_manager()->leader_ready_term_; + auto status = + master_->catalog_manager()->sys_catalog_->UpdateItem(index_table_info.get(), leader_term); + if (!status.ok()) { + LOG_WITH_PREFIX(ERROR) << "Could not update index_table_info for " + << index_table_id + << " to enable compactions: " << status; + return; + } + + // Update the in-memory state. + TRACE("Committing in-memory state"); + l->Commit(); + } + VLOG(2) << __func__ << ": Unlocked index table for Read"; + VLOG(1) << "Sending backfill done requests to the Index table"; + SendRpcToAllowCompactionsToGCDeleteMarkers(index_table_info); + VLOG(1) << "DONE Sending backfill done requests to the Index table"; +} + +void BackfillTable::SendRpcToAllowCompactionsToGCDeleteMarkers( + const scoped_refptr& table) { + vector> tablets; + table->GetAllTablets(&tablets); + + for (const scoped_refptr& tablet : tablets) { + SendRpcToAllowCompactionsToGCDeleteMarkers(tablet); + } +} + +void BackfillTable::SendRpcToAllowCompactionsToGCDeleteMarkers( + const scoped_refptr& tablet) { + auto call = std::make_shared(master_, callback_pool_, tablet); + tablet->table()->AddTask(call); + WARN_NOT_OK(call->Run(), "Failed to send backfill done request"); +} + +// ----------------------------------------------------------------------------------------------- +// BackfillTablet +// ----------------------------------------------------------------------------------------------- +BackfillTablet::BackfillTablet( + std::shared_ptr backfill_table, const scoped_refptr& tablet) + : backfill_table_(backfill_table), tablet_(tablet) { + { + auto l = tablet_->LockForRead(); + Partition::FromPB(tablet_->metadata().state().pb.partition(), &partition_); + if (tablet_->metadata().state().pb.has_backfilled_until()) { + chunk_start_ = tablet_->metadata().state().pb.backfilled_until(); + } else { + chunk_start_ = partition_.partition_key_start(); + } + } + if (chunk_start_ != partition_.partition_key_start()) { + VLOG(1) << tablet_->ToString() << " resuming backfill from " << yb::ToString(chunk_start_) + << " (instead of begining from " << yb::ToString(partition_.partition_key_start()); + } else { + VLOG(1) << tablet_->ToString() << " begining backfill from " << yb::ToString(chunk_start_); + } + chunk_end_ = GetChunkEnd(chunk_start_); +} + +void BackfillTablet::LaunchNextChunk() { + if (!backfill_table_->done()) { + auto chunk = std::make_shared(shared_from_this(), chunk_start_, chunk_end_); + chunk->Launch(); + } +} + +void BackfillTablet::Done(const Status& status) { + if (!status.ok()) { + LOG(INFO) << "Failed to backfill the tablet " << yb::ToString(tablet_) << status; + backfill_table_->Done(status); + return; + } + + processed_until_ = chunk_end_; + VLOG(2) << "Done backfilling the tablet " << yb::ToString(tablet_) << " until " + << yb::ToString(processed_until_); + { + tablet_->mutable_metadata()->StartMutation(); + tablet_->mutable_metadata()->mutable_dirty()->pb.set_backfilled_until(processed_until_); + auto term = backfill_table_->master()->catalog_manager()->leader_ready_term(); + WARN_NOT_OK( + backfill_table_->master()->catalog_manager()->sys_catalog()->UpdateItem( + tablet_.get(), term), + "Could not persist that the tablet is done backfilling."); + tablet_->mutable_metadata()->CommitMutation(); + } + + chunk_start_ = processed_until_; + chunk_end_ = GetChunkEnd(chunk_start_); + + // This is the last chunk. + if (chunk_start_ == chunk_end_) { + VLOG(1) << "Done backfilling the tablet " << yb::ToString(tablet_); + backfill_table_->Done(status); + return; + } + + LaunchNextChunk(); +} + +// ----------------------------------------------------------------------------------------------- +// GetSafeTimeForTablet +// ----------------------------------------------------------------------------------------------- + +void GetSafeTimeForTablet::Launch() { + tablet_->table()->AddTask(shared_from_this()); + Status status = Run(); + WARN_NOT_OK(status, Substitute("Failed to send GetSafeTime request for $0", tablet_->ToString())); + + // Need to print this after Run() because that's where it picks the TS which description() + // needs. + if (status.ok()) { + VLOG(3) << "Started GetSafeTimeForTablet : " << this->description(); + } +} + +bool GetSafeTimeForTablet::SendRequest(int attempt) { + VLOG(1) << __PRETTY_FUNCTION__; + tserver::GetSafeTimeRequestPB req; + req.set_dest_uuid(permanent_uuid()); + req.set_tablet_id(tablet_->tablet_id()); + req.set_propagated_hybrid_time(backfill_table_->master()->clock()->Now().ToUint64()); + + ts_admin_proxy_->GetSafeTimeAsync(req, &resp_, &rpc_, BindRpcCallback()); + VLOG(1) << "Send " << description() << " to " << permanent_uuid() + << " (attempt " << attempt << "):\n" + << req.DebugString(); + return true; +} + +void GetSafeTimeForTablet::HandleResponse(int attempt) { + VLOG(1) << __PRETTY_FUNCTION__; + Status status = Status::OK(); + if (resp_.has_error()) { + status = StatusFromPB(resp_.error().status()); + + // Do not retry on a fatal error + switch (resp_.error().code()) { + case TabletServerErrorPB::TABLET_NOT_FOUND: + case TabletServerErrorPB::MISMATCHED_SCHEMA: + case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA: + case TabletServerErrorPB::OPERATION_NOT_SUPPORTED: + LOG(WARNING) << "TS " << permanent_uuid() << ": GetSafeTime failed for tablet " + << tablet_->ToString() << " no further retry: " << status.ToString(); + TransitionToTerminalState(MonitoredTaskState::kRunning, MonitoredTaskState::kFailed); + break; + default: + LOG(WARNING) << "TS " << permanent_uuid() << ": GetSafeTime failed for tablet " + << tablet_->ToString() << ": " << status.ToString() << " code " + << resp_.error().code(); + break; + } + } else { + TransitionToTerminalState(MonitoredTaskState::kRunning, MonitoredTaskState::kComplete); + // 1 + VLOG(1) << "TS " << permanent_uuid() << ": GetSafeTime complete on tablet " + << tablet_->ToString(); + } + + server::UpdateClock(resp_, master_->clock()); +} + +void GetSafeTimeForTablet::UnregisterAsyncTaskCallback() { + Status status; + HybridTime safe_time; + if (resp_.has_error()) { + status = StatusFromPB(resp_.error().status()); + VLOG(3) << "GetSafeTime for " << tablet_->ToString() << " got an error. Returning " + << safe_time; + } else { + safe_time = HybridTime(resp_.safe_time()); + if (safe_time.is_special()) { + LOG(ERROR) << "GetSafeTime for " << tablet_->ToString() << " got " << safe_time; + } else { + VLOG(3) << "GetSafeTime for " << tablet_->ToString() << " got " << safe_time; + } + } + backfill_table_->UpdateSafeTime(status, safe_time); +} + +// ----------------------------------------------------------------------------------------------- +// BackfillChunk +// ----------------------------------------------------------------------------------------------- +void BackfillChunk::Launch() { + backfill_tablet_->tablet()->table()->AddTask(shared_from_this()); + Status status = Run(); + WARN_NOT_OK( + status, Substitute( + "Failed to send backfill Chunk request for $0", + backfill_tablet_->tablet().get()->ToString())); + + // Need to print this after Run() because that's where it picks the TS which description() + // needs. + if (status.ok()) { + LOG(INFO) << "Started BackfillChunk : " << this->description(); + } +} + +MonoTime BackfillChunk::ComputeDeadline() { + MonoTime timeout = MonoTime::Now(); + // We expect this RPC to take a long time. + // Allow this RPC about 1 hour before retrying it. + timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_index_backfill_rpc_timeout_ms)); + return MonoTime::Earliest(timeout, deadline_); +} + +int BackfillChunk::num_max_retries() { + return FLAGS_index_backfill_rpc_max_retries; +} + +int BackfillChunk::max_delay_ms() { + return FLAGS_index_backfill_rpc_max_delay_ms; +} + +bool BackfillChunk::SendRequest(int attempt) { + VLOG(1) << __PRETTY_FUNCTION__; + tserver::BackfillIndexRequestPB req; + req.set_dest_uuid(permanent_uuid()); + req.set_tablet_id(backfill_tablet_->tablet()->tablet_id()); + req.set_read_at_hybrid_time(backfill_tablet_->read_time_for_backfill().ToUint64()); + req.set_schema_version(backfill_tablet_->schema_version()); + req.set_start_key(start_key_); + req.set_end_key(end_key_); + for (const IndexInfoPB& idx_info : backfill_tablet_->indices()) { + req.add_indexes()->CopyFrom(idx_info); + } + req.set_propagated_hybrid_time(backfill_tablet_->master()->clock()->Now().ToUint64()); + + ts_admin_proxy_->BackfillIndexAsync(req, &resp_, &rpc_, BindRpcCallback()); + // 1 + VLOG(1) << "Send " << description() << " to " << permanent_uuid() + << " (attempt " << attempt << "):\n" + << req.DebugString(); + return true; +} + +void BackfillChunk::HandleResponse(int attempt) { + VLOG(1) << __PRETTY_FUNCTION__; + Status status; + if (resp_.has_error()) { + status = StatusFromPB(resp_.error().status()); + + // Do not retry on a fatal error + switch (resp_.error().code()) { + case TabletServerErrorPB::TABLET_NOT_FOUND: + case TabletServerErrorPB::MISMATCHED_SCHEMA: + case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA: + case TabletServerErrorPB::OPERATION_NOT_SUPPORTED: + LOG(WARNING) << "TS " << permanent_uuid() << ": backfill failed for tablet " + << backfill_tablet_->tablet()->ToString() + << " no further retry: " << status.ToString(); + TransitionToTerminalState(MonitoredTaskState::kRunning, MonitoredTaskState::kFailed); + break; + default: + LOG(WARNING) << "TS " << permanent_uuid() << ": backfill failed for tablet " + << backfill_tablet_->tablet()->ToString() << ": " << status.ToString() + << " code " << resp_.error().code(); + break; + } + } else { + TransitionToTerminalState(MonitoredTaskState::kRunning, MonitoredTaskState::kComplete); + // 1 + VLOG(1) << "TS " << permanent_uuid() << ": backfill complete on tablet " + << backfill_tablet_->tablet()->ToString(); + } + + server::UpdateClock(resp_, master_->clock()); +} + +void BackfillChunk::UnregisterAsyncTaskCallback() { + Status status; + if (resp_.has_error()) { + status = StatusFromPB(resp_.error().status()); + } + backfill_tablet_->Done(status); +} + +} // namespace master +} // namespace yb diff --git a/src/yb/master/backfill_index.h b/src/yb/master/backfill_index.h new file mode 100644 index 000000000000..d391f56f1774 --- /dev/null +++ b/src/yb/master/backfill_index.h @@ -0,0 +1,292 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef YB_MASTER_BACKFILL_INDEX_H +#define YB_MASTER_BACKFILL_INDEX_H + +#include +#include + +#include "yb/common/index.h" +#include "yb/common/partition.h" +#include "yb/master/async_rpc_tasks.h" +#include "yb/master/catalog_entity_info.h" +#include "yb/server/monitored_task.h" +#include "yb/util/format.h" +#include "yb/util/locks.h" +#include "yb/util/monotime.h" +#include "yb/util/status.h" + +namespace yb { +namespace master { + +class CatalogManager; + +// Implements a multi-stage alter table. As of Dec 30 2019, used for adding an +// index to an existing table, such that the index can be backfilled with +// historic data in an online manner. +// +class MultiStageAlterTable { + public: + // Launches the next stage of the multi stage schema change. Updates the + // table info, upon the completion of an alter table round if we are in the + // middle of an index backfill. Will update the IndexPermission from + // INDEX_PERM_DELETE_ONLY -> INDEX_PERM_WRITE_AND_DELETE -> BACKFILL + // Returns true if the next version of table schema/info was kicked off. + // Returns false otherwise -- so that the alter table operation can be + // considered to have completed. + static bool LaunchNextTableInfoVersionIfNecessary( + CatalogManager* mgr, const scoped_refptr& Info); + + // Updates and persists the IndexPermission corresponding to the index_table_id for + // the indexed_table's TableInfo. + static Status UpdateIndexPermission( + CatalogManager* mgr, const scoped_refptr& indexed_table, + const TableId& index_table_id, IndexPermissions new_perm); + + private: + // Start Index Backfill process/step for the specified table/index. + static void + StartBackfillingData(CatalogManager *catalog_manager, + const scoped_refptr &indexed_table, + IndexInfoPB idx_info); +}; + +class BackfillTablet; +class BackfillChunk; + +// This class is responsible for backfilling the specified indices on the +// indexed_table. +class BackfillTable : public std::enable_shared_from_this { + public: + BackfillTable(Master *master, ThreadPool *callback_pool, + const scoped_refptr &indexed_table, + std::vector indices) + : master_(master), callback_pool_(callback_pool), + indexed_table_(indexed_table), indices_to_build_(indices) { + LOG_IF(DFATAL, indices_to_build_.size() != 1) + << "As of Dec 2019, we only support " + << "building one index at a time. indices_to_build_.size() = " + << indices_to_build_.size(); + auto l = indexed_table_->LockForRead(); + schema_version_ = indexed_table_->metadata().state().pb.version(); + } + + void Launch(); + + void UpdateSafeTime(const Status& s, HybridTime ht); + + void Done(const Status& s); + + Master* master() { return master_; } + + ThreadPool* threadpool() { return callback_pool_; } + + const std::vector& indices() const { return indices_to_build_; } + + int32_t schema_version() const { return schema_version_; } + + std::string LogPrefix() const; + + bool done() const { + return done_.load(std::memory_order_acquire); + } + + HybridTime read_time_for_backfill() const { + std::lock_guard l(mutex_); + return read_time_for_backfill_; + } + + private: + void LaunchComputeSafeTimeForRead(); + + void LaunchBackfill(); + + void AlterTableStateToSuccess(); + + void AlterTableStateToAbort(); + + void ClearCheckpointStateInTablets(); + + // We want to prevent major compactions from garbage collecting delete markers + // on an index table, until the backfill process is complete. + // This API is used at the end of a successful backfill to enable major compactions + // to gc delete markers on an index table. + void AllowCompactionsToGCDeleteMarkers(const TableId& index_table_id); + + // Send the "backfill done request" to all tablets of the specified table. + void SendRpcToAllowCompactionsToGCDeleteMarkers(const scoped_refptr& index_table); + + // Send the "backfill done request" to the specified tablet. + void SendRpcToAllowCompactionsToGCDeleteMarkers( + const scoped_refptr& index_table_tablet); + + Master* master_; + ThreadPool* callback_pool_; + const scoped_refptr indexed_table_; + const std::vector indices_to_build_; + int32_t schema_version_; + + std::atomic_bool done_; + std::atomic tablets_pending_; + mutable simple_spinlock mutex_; + HybridTime read_time_for_backfill_ GUARDED_BY(mutex_){HybridTime::kMax}; +}; + +// A background task which is responsible for backfilling rows from a given +// tablet in the indexed table. +class BackfillTablet : public std::enable_shared_from_this { + public: + BackfillTablet( + std::shared_ptr backfill_table, const scoped_refptr& tablet); + + void Launch() { LaunchNextChunk(); } + + void LaunchNextChunk(); + void Done(const Status& status); + + Master* master() { return backfill_table_->master(); } + + ThreadPool* threadpool() { return backfill_table_->threadpool(); } + + HybridTime read_time_for_backfill() { + return backfill_table_->read_time_for_backfill(); + } + + const std::vector& indices() { return backfill_table_->indices(); } + + int32_t schema_version() { return backfill_table_->schema_version(); } + + // Returns the partition key corresponding to the end of this chunk. This is encoded + // using the same hashing/partition scheme as used by the main/indexed table. + // As of Dec 2019, we consider the whole tablet range to be one chunk. But the plan is + // to subdivide this into smaller chunks going forward for better checkpointing + // and restartability (#2615). + std::string GetChunkEnd(std::string start) { + // TODO(#2615) : Ideally we want to split one tablet into multiple chunks, so that + // each chunk can make progress and finish relatively quickly. + return partition_.partition_key_end(); + } + + const scoped_refptr tablet() { return tablet_; } + + private: + std::shared_ptr backfill_table_; + const scoped_refptr tablet_; + Partition partition_; + + // partition keys corresponding to the start/end of the chunk being processed, + // and how far backfill has been already processed. + std::string chunk_start_, chunk_end_, processed_until_; +}; + +class GetSafeTimeForTablet : public RetryingTSRpcTask { + public: + GetSafeTimeForTablet( + std::shared_ptr backfill_table, const scoped_refptr& tablet) + : RetryingTSRpcTask( + backfill_table->master(), backfill_table->threadpool(), + gscoped_ptr(new PickLeaderReplica(tablet)), tablet->table().get()), + backfill_table_(backfill_table), + tablet_(tablet) { + deadline_ = MonoTime::Max(); // Never time out. + } + + void Launch(); + + Type type() const override { return ASYNC_GET_SAFE_TIME; } + + std::string type_name() const override { return "Get SafeTime for Tablet"; } + + std::string description() const override { + return yb::Format( + "Fetch SafeTime for Backfilling index tablet for $0. Indices $1", + tablet_, + backfill_table_->indices()); + } + + private: + TabletId tablet_id() const override { return tablet_->id(); } + + void HandleResponse(int attempt) override; + + bool SendRequest(int attempt) override; + + void UnregisterAsyncTaskCallback() override; + + TabletServerId permanent_uuid() { + return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : ""; + } + + tserver::GetSafeTimeResponsePB resp_; + std::shared_ptr backfill_table_; + const scoped_refptr tablet_; +}; + +// A background task which is responsible for backfilling rows in the partitions +// [start, end) on the indexed table. +class BackfillChunk : public RetryingTSRpcTask { + public: + BackfillChunk( + std::shared_ptr backfill_tablet, const std::string& start, + const std::string& end) + : RetryingTSRpcTask( + backfill_tablet->master(), backfill_tablet->threadpool(), + gscoped_ptr(new PickLeaderReplica(backfill_tablet->tablet())), + backfill_tablet->tablet()->table().get()), + backfill_tablet_(backfill_tablet), + start_key_(start), + end_key_(end) { + deadline_ = MonoTime::Max(); // Never time out. + } + + void Launch(); + + Type type() const override { return ASYNC_BACKFILL_TABLET_CHUNK; } + + std::string type_name() const override { return "Backfill Index Table"; } + + std::string description() const override { + return yb::Format( + "Backfilling index tablet for $0 from $1 to $2 for indices $3", + backfill_tablet_->tablet(), "start_key_", "end_key_", + backfill_tablet_->indices()); + } + + MonoTime ComputeDeadline() override; + + private: + TabletId tablet_id() const override { return backfill_tablet_->tablet()->id(); } + + void HandleResponse(int attempt) override; + + bool SendRequest(int attempt) override; + + void UnregisterAsyncTaskCallback() override; + + TabletServerId permanent_uuid() { + return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : ""; + } + + int num_max_retries() override; + int max_delay_ms() override; + + tserver::BackfillIndexResponsePB resp_; + std::shared_ptr backfill_tablet_; + std::string start_key_, end_key_; +}; + +} // namespace master +} // namespace yb + +#endif // YB_MASTER_BACKFILL_INDEX_H diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index f19e163e57e8..5a4ca29aa723 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -376,6 +376,14 @@ class TableInfo : public RefCountedThreadSafe, // Returns true if the table creation is in-progress. bool IsCreateInProgress() const; + // Returns true if the table is backfilling an index. + bool IsBackfilling() const { + return is_backfilling_; + } + void SetIsBackfilling(bool flag) { + is_backfilling_ = flag; + } + // Returns true if an "Alter" operation is in-progress. bool IsAlterInProgress(uint32_t version) const; @@ -419,6 +427,9 @@ class TableInfo : public RefCountedThreadSafe, // If closing, requests to AddTask will be promptly aborted. bool closing_ = false; + // In memory state set during backfill to prevent multiple backfill jobs. + bool is_backfilling_ = false; + // List of pending tasks (e.g. create/alter tablet requests). std::unordered_set> pending_tasks_; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 13f016f25cb7..99f9950626b5 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -87,40 +87,41 @@ #include "yb/gutil/strings/substitute.h" #include "yb/gutil/sysinfo.h" #include "yb/gutil/walltime.h" +#include "yb/master/async_rpc_tasks.h" +#include "yb/master/backfill_index.h" +#include "yb/master/catalog_loaders.h" +#include "yb/master/catalog_manager_bg_tasks.h" #include "yb/master/catalog_manager_util.h" #include "yb/master/cluster_balance.h" +#include "yb/master/encryption_manager.h" #include "yb/master/master.h" #include "yb/master/master.pb.h" #include "yb/master/master.proxy.h" #include "yb/master/master_util.h" #include "yb/master/sys_catalog_constants.h" -#include "yb/master/system_tablet.h" +#include "yb/master/sys_catalog_initialization.h" #include "yb/master/sys_catalog.h" +#include "yb/master/system_tablet.h" +#include "yb/master/tasks_tracker.h" #include "yb/master/ts_descriptor.h" #include "yb/master/ts_manager.h" -#include "yb/master/async_rpc_tasks.h" -#include "yb/master/yql_auth_roles_vtable.h" -#include "yb/master/yql_auth_role_permissions_vtable.h" +#include "yb/master/yql_aggregates_vtable.h" #include "yb/master/yql_auth_resource_role_permissions_index.h" +#include "yb/master/yql_auth_role_permissions_vtable.h" +#include "yb/master/yql_auth_roles_vtable.h" #include "yb/master/yql_columns_vtable.h" #include "yb/master/yql_empty_vtable.h" +#include "yb/master/yql_functions_vtable.h" +#include "yb/master/yql_indexes_vtable.h" #include "yb/master/yql_keyspaces_vtable.h" #include "yb/master/yql_local_vtable.h" +#include "yb/master/yql_partitions_vtable.h" #include "yb/master/yql_peers_vtable.h" +#include "yb/master/yql_size_estimates_vtable.h" #include "yb/master/yql_tables_vtable.h" -#include "yb/master/yql_aggregates_vtable.h" -#include "yb/master/yql_functions_vtable.h" -#include "yb/master/yql_indexes_vtable.h" #include "yb/master/yql_triggers_vtable.h" #include "yb/master/yql_types_vtable.h" #include "yb/master/yql_views_vtable.h" -#include "yb/master/yql_partitions_vtable.h" -#include "yb/master/yql_size_estimates_vtable.h" -#include "yb/master/catalog_manager_bg_tasks.h" -#include "yb/master/catalog_loaders.h" -#include "yb/master/sys_catalog_initialization.h" -#include "yb/master/tasks_tracker.h" -#include "yb/master/encryption_manager.h" #include "yb/tserver/ts_tablet_manager.h" #include "yb/rpc/messenger.h" @@ -132,6 +133,7 @@ #include "yb/tserver/tserver_admin.proxy.h" #include "yb/util/crypt.h" +#include "yb/util/debug-util.h" #include "yb/util/debug/trace_event.h" #include "yb/util/flag_tags.h" #include "yb/util/logging.h" @@ -244,6 +246,11 @@ DEFINE_uint64(metrics_snapshots_table_num_tablets, 0, "Number of tablets to use when creating the metrics snapshots table." "0 to use the same default num tablets as for regular tables."); +DEFINE_bool(disable_index_backfill, true, // Temporarily disabled until all diffs land. + "A kill switch to disable multi-stage backfill for the created indices."); +TAG_FLAG(disable_index_backfill, runtime); +TAG_FLAG(disable_index_backfill, hidden); + DEFINE_bool( hide_pg_catalog_table_creation_logs, false, "Whether to hide detailed log messages for PostgreSQL catalog table creation. " @@ -366,6 +373,7 @@ namespace { class IndexInfoBuilder { public: explicit IndexInfoBuilder(IndexInfoPB* index_info) : index_info_(*index_info) { + DVLOG(3) << " After " << __PRETTY_FUNCTION__ << " index_info_ is " << yb::ToString(index_info_); } void ApplyProperties(const TableId& indexed_table_id, bool is_local, bool is_unique) { @@ -373,6 +381,7 @@ class IndexInfoBuilder { index_info_.set_version(0); index_info_.set_is_local(is_local); index_info_.set_is_unique(is_unique); + DVLOG(3) << " After " << __PRETTY_FUNCTION__ << " index_info_ is " << yb::ToString(index_info_); } CHECKED_STATUS ApplyColumnMapping(const Schema& indexed_schema, const Schema& index_schema) { @@ -396,6 +405,7 @@ class IndexInfoBuilder { i++) { index_info_.add_indexed_range_column_ids(indexed_schema.column_id(i)); } + DVLOG(3) << " After " << __PRETTY_FUNCTION__ << " index_info_ is " << yb::ToString(index_info_); return Status::OK(); } @@ -647,7 +657,7 @@ Status CatalogManager::VisitSysCatalog(int64_t term) { LOG(INFO) << __func__ << ": Acquire catalog manager lock_ before loading sys catalog.."; std::lock_guard lock(lock_); - VLOG(1) << __func__ << ": Acquired the catalog manager lock_"; + VLOG(3) << __func__ << ": Acquired the catalog manager lock_"; // Abort any outstanding tasks. All TableInfos are orphaned below, so // it's important to end their tasks now; otherwise Shutdown() will @@ -1410,6 +1420,8 @@ Status CatalogManager::ValidateTableReplicationInfo(const ReplicationInfoPB& rep Status CatalogManager::AddIndexInfoToTable(const scoped_refptr& indexed_table, const IndexInfoPB& index_info, CreateTableResponsePB* resp) { + LOG(INFO) << "AddIndexInfoToTable to " << indexed_table->ToString() << " IndexInfo " + << yb::ToString(index_info); TRACE("Locking indexed table"); auto l = DCHECK_NOTNULL(indexed_table)->LockForWrite(); RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(l.get(), resp)); @@ -1784,6 +1796,7 @@ Status CatalogManager::CopyPgsqlSysTables(const NamespaceId& namespace_id, Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, CreateTableResponsePB* resp, rpc::RpcContext* rpc) { + DVLOG(3) << __PRETTY_FUNCTION__ << " Begin. " << orig_req->DebugString(); RETURN_NOT_OK(CheckOnline()); const bool is_pg_table = orig_req->table_type() == PGSQL_TABLE_TYPE; @@ -1984,6 +1997,8 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, // For index table, populate the index info. IndexInfoPB index_info; + // Fetch the runtime flag to prevent any issues from the updates to flag while processing. + const bool disable_index_backfill = GetAtomicFlag(&FLAGS_disable_index_backfill); if (req.has_index_info()) { // Current message format. index_info.CopyFrom(req.index_info()); @@ -2009,6 +2024,14 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, } } + if ((req.has_index_info() || req.has_indexed_table_id()) && !disable_index_backfill) { + // Start off the index table with major compactions disabled. We need this to preserve + // the delete markers until the backfill process is completed. + // No need to set index_permissions in the index table. + schema.SetIsBackfilling(true); + } + + LOG(INFO) << "CreateTable with IndexInfo " << yb::ToString(index_info); TSDescriptorVector all_ts_descs; master_->ts_manager()->GetAllLiveDescriptors(&all_ts_descs); s = CheckValidReplicationInfo(replication_info, all_ts_descs, partitions, resp); @@ -2115,6 +2138,9 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, // For index table, insert index info in the indexed table. if ((req.has_index_info() || req.has_indexed_table_id()) && !is_pg_table) { + if (!disable_index_backfill) { + index_info.set_index_permissions(INDEX_PERM_DELETE_ONLY); + } s = AddIndexInfoToTable(indexed_table, index_info, resp); if (PREDICT_FALSE(!s.ok())) { return AbortTableCreation(table.get(), tablets, @@ -2166,6 +2192,7 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, } } + DVLOG(3) << __PRETTY_FUNCTION__ << " Done."; return Status::OK(); } @@ -3300,6 +3327,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(l.get(), resp)); bool has_changes = false; + auto& table_pb = l->mutable_data()->pb; const TableName table_name = l->data().name(); const NamespaceId namespace_id = l->data().namespace_id(); const TableName new_table_name = req->has_new_table_name() ? req->new_table_name() : table_name; @@ -3328,6 +3356,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, } std::lock_guard catalog_lock(lock_); + VLOG(3) << __func__ << ": Acquired the catalog manager lock_"; TRACE("Acquired catalog manager lock"); @@ -3343,8 +3372,8 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // Acquire the new table name (now we have 2 name for the same table). table_names_map_[{new_namespace_id, new_table_name}] = table; - l->mutable_data()->pb.set_namespace_id(new_namespace_id); - l->mutable_data()->pb.set_name(new_table_name); + table_pb.set_namespace_id(new_namespace_id); + table_pb.set_name(new_table_name); has_changes = true; } @@ -3359,7 +3388,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, } // TODO(hector): Handle co-partitioned tables: // https://github.com/YugaByte/yugabyte-db/issues/1905. - l->mutable_data()->pb.set_wal_retention_secs(req->wal_retention_secs()); + table_pb.set_wal_retention_secs(req->wal_retention_secs()); has_changes = true; } @@ -3371,21 +3400,30 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // Serialize the schema Increment the version number. if (new_schema.initialized()) { if (!l->data().pb.has_fully_applied_schema()) { - l->mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l->data().pb.schema()); + table_pb.mutable_fully_applied_schema()->CopyFrom(l->data().pb.schema()); + // The idea here is that if we are in the middle of updating the schema + // from one state to another, then YBClients will be given the older + // version until the schema is updated on all the tablets. + // As of Dec 2019, this may lead to some rejected operations/retries during + // the index backfill. See #3284 for possible optimizations. + table_pb.set_fully_applied_schema_version(l->data().pb.version()); + table_pb.mutable_fully_applied_indexes()->CopyFrom(l->data().pb.indexes()); + if (l->data().pb.has_index_info()) { + table_pb.mutable_fully_applied_index_info()->CopyFrom(l->data().pb.index_info()); + } } - SchemaToPB(new_schema, l->mutable_data()->pb.mutable_schema()); + SchemaToPB(new_schema, table_pb.mutable_schema()); } // Only increment the version number if it is a schema change (AddTable change goes through a // different path and it's not processed here). if (!req->has_wal_retention_secs()) { - l->mutable_data()->pb.set_version(l->mutable_data()->pb.version() + 1); + table_pb.set_version(table_pb.version() + 1); } - l->mutable_data()->pb.set_next_column_id(next_col_id); - l->mutable_data()->set_state(SysTablesEntryPB::ALTERING, - Substitute("Alter table version=$0 ts=$1", - l->mutable_data()->pb.version(), - LocalTimeAsString())); + table_pb.set_next_column_id(next_col_id); + l->mutable_data()->set_state( + SysTablesEntryPB::ALTERING, + Substitute("Alter table version=$0 ts=$1", table_pb.version(), LocalTimeAsString())); // Update sys-catalog with the new table schema. TRACE("Updating metadata on disk"); @@ -3398,6 +3436,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, if (table->GetTableType() != PGSQL_TABLE_TYPE && (req->has_new_namespace() || req->has_new_table_name())) { std::lock_guard catalog_lock(lock_); + VLOG(3) << __func__ << ": Acquired the catalog manager lock_"; CHECK_EQ(table_names_map_.erase({new_namespace_id, new_table_name}), 1); } // TableMetadaLock follows RAII paradigm: when it leaves scope, @@ -3478,9 +3517,28 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req, // schema that has reached every TS. CHECK(l->data().pb.state() == SysTablesEntryPB::ALTERING); resp->mutable_schema()->CopyFrom(l->data().pb.fully_applied_schema()); + resp->set_version(l->data().pb.fully_applied_schema_version()); + resp->mutable_indexes()->CopyFrom(l->data().pb.fully_applied_indexes()); + if (l->data().pb.has_fully_applied_index_info()) { + *resp->mutable_index_info() = l->data().pb.fully_applied_index_info(); + resp->set_obsolete_indexed_table_id(PROTO_GET_INDEXED_TABLE_ID(l->data().pb)); + } + VLOG(1) << " Returning " + << " fully_applied_schema with version " << l->data().pb.fully_applied_schema_version() + << " : \n" + << yb::ToString(l->data().pb.fully_applied_indexes()) << "\n instead of version " + << l->data().pb.version() << "\n" + << yb::ToString(l->data().pb.indexes()); } else { // There's no AlterTable, the regular schema is "fully applied". resp->mutable_schema()->CopyFrom(l->data().pb.schema()); + resp->set_version(l->data().pb.version()); + resp->mutable_indexes()->CopyFrom(l->data().pb.indexes()); + if (l->data().pb.has_index_info()) { + *resp->mutable_index_info() = l->data().pb.index_info(); + resp->set_obsolete_indexed_table_id(PROTO_GET_INDEXED_TABLE_ID(l->data().pb)); + } + VLOG(1) << " Returning pb.schema() "; } // TODO(bogdan): add back in replication_info once we allow overrides! resp->mutable_partition_schema()->CopyFrom(l->data().pb.partition_schema()); @@ -3495,12 +3553,6 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req, if (FindNamespace(nsid, &nsinfo).ok()) { resp->mutable_identifier()->mutable_namespace_()->set_name(nsinfo->name()); } - resp->set_version(l->data().pb.version()); - resp->mutable_indexes()->CopyFrom(l->data().pb.indexes()); - if (l->data().pb.has_index_info()) { - *resp->mutable_index_info() = l->data().pb.index_info(); - resp->set_obsolete_indexed_table_id(PROTO_GET_INDEXED_TABLE_ID(l->data().pb)); - } // Get namespace name by id. SharedLock l_map(lock_); @@ -3508,13 +3560,15 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req, const scoped_refptr ns = FindPtrOrNull(namespace_ids_map_, table->namespace_id()); if (ns == nullptr) { - Status s = STATUS_SUBSTITUTE(NotFound, - "Could not find namespace by namespace id $0 for request $1.", + Status s = STATUS_SUBSTITUTE( + NotFound, "Could not find namespace by namespace id $0 for request $1.", table->namespace_id(), req->DebugString()); return SetupError(resp->mutable_error(), MasterErrorPB::NAMESPACE_NOT_FOUND, s); } resp->mutable_identifier()->mutable_namespace_()->set_name(ns->name()); + VLOG(1) << "Serviced GetTableSchema request for " << req->ShortDebugString() << " with " + << yb::ToString(*resp); return Status::OK(); } @@ -4272,6 +4326,7 @@ Status CatalogManager::DeleteNamespace(const DeleteNamespaceRequestPB* req, TRACE("Looking for tables in the namespace"); { SharedLock catalog_lock(lock_); + VLOG(3) << __func__ << ": Acquired the catalog manager lock_"; for (const TableInfoMap::value_type& entry : *table_ids_map_) { auto ltm = entry.second->LockForRead(); @@ -4290,6 +4345,7 @@ Status CatalogManager::DeleteNamespace(const DeleteNamespaceRequestPB* req, TRACE("Looking for types in the namespace"); { SharedLock catalog_lock(lock_); + VLOG(3) << __func__ << ": Acquired the catalog manager lock_"; for (const UDTypeInfoMap::value_type& entry : udtype_ids_map_) { auto ltm = entry.second->LockForRead(); @@ -5532,33 +5588,48 @@ void CatalogManager::HandleAssignCreatingTablet(TabletInfo* tablet, Status CatalogManager::HandleTabletSchemaVersionReport(TabletInfo *tablet, uint32_t version) { // Update the schema version if it's the latest. tablet->set_reported_schema_version(version); + VLOG(1) << "Tablet " << tablet->tablet_id() << " reported version " << version; // Verify if it's the last tablet report, and the alter completed. TableInfo *table = tablet->table().get(); - auto l = table->LockForWrite(); - if (l->data().pb.state() != SysTablesEntryPB::ALTERING) { - return Status::OK(); - } + { + auto l = table->LockForRead(); + if (l->data().pb.state() != SysTablesEntryPB::ALTERING) { + VLOG(2) << "Table " << table->ToString() << " is not altering"; + return Status::OK(); + } - uint32_t current_version = l->data().pb.version(); - if (table->IsAlterInProgress(current_version)) { - return Status::OK(); + uint32_t current_version = l->data().pb.version(); + if (table->IsAlterInProgress(current_version)) { + VLOG(2) << "Table " << table->ToString() << " has IsAlterInProgress (" + << current_version << ")"; + return Status::OK(); + } } - // Update the state from altering to running and remove the last fully - // applied schema (if it exists). - l->mutable_data()->pb.clear_fully_applied_schema(); - l->mutable_data()->set_state(SysTablesEntryPB::RUNNING, - Substitute("Current schema version=$0", current_version)); + if (!MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary(this, tablet->table())) { + // If there aren't any "next steps" to launch, + // Update the state from altering to running and remove the last fully + // applied schema information (if it exists). + auto l = table->LockForWrite(); + uint32_t current_version = l->data().pb.version(); + l->mutable_data()->pb.clear_fully_applied_schema(); + l->mutable_data()->pb.clear_fully_applied_schema_version(); + l->mutable_data()->pb.clear_fully_applied_indexes(); + l->mutable_data()->pb.clear_fully_applied_index_info(); + l->mutable_data()->set_state( + SysTablesEntryPB::RUNNING, Substitute("Current schema version=$0", current_version)); - Status s = sys_catalog_->UpdateItem(table, leader_ready_term_); - if (!s.ok()) { - LOG(WARNING) << "An error occurred while updating sys-tables: " << s.ToString(); - return s; - } + Status s = sys_catalog_->UpdateItem(table, leader_ready_term_); + if (!s.ok()) { + LOG(WARNING) << "An error occurred while updating sys-tables: " << s.ToString() + << ". This master may not be the leader anymore."; + return Status::OK(); + } - l->Commit(); - LOG(INFO) << table->ToString() << " - Alter table completed version=" << current_version; + l->Commit(); + LOG(INFO) << table->ToString() << " - Alter table completed version=" << current_version; + } return Status::OK(); } @@ -5651,6 +5722,7 @@ Status CatalogManager::ProcessPendingAssignments(const TabletInfos& tablets) { TSDescriptorVector ts_descs; master_->ts_manager()->GetAllLiveDescriptors(&ts_descs, blacklistState.tservers_); Status s; + unordered_set ok_status_tables; for (TabletInfo *tablet : deferred.needs_create_rpc) { // NOTE: if we fail to select replicas on the first pass (due to // insufficient Tablet Servers being online), we will still try @@ -5662,11 +5734,20 @@ Status CatalogManager::ProcessPendingAssignments(const TabletInfos& tablets) { tablet->tablet_id(), s.ToString())); tablet->table()->SetCreateTableErrorStatus(s); break; + } else { + ok_status_tables.emplace(tablet->table().get()); } } // Update the sys catalog with the new set of tablets/metadata. if (s.ok()) { + // If any of the ok_status_tables had an error in the previous iterations, we + // need to clear up the error status to reflect that all the create tablets have now + // succeded. + for (TableInfo* table : ok_status_tables) { + table->SetCreateTableErrorStatus(Status::OK()); + } + s = sys_catalog_->AddAndUpdateItems(deferred.tablets_to_add, deferred.tablets_to_update, leader_ready_term_); @@ -6656,11 +6737,11 @@ Status CatalogManager::GetLoadMoveCompletionPercent(GetLoadMovePercentResponsePB void CatalogManager::AbortAndWaitForAllTasks(const vector>& tables) { for (const auto& t : tables) { - VLOG(1) << "Aborting tasks for table " << t; + VLOG(1) << "Aborting tasks for table " << t->ToString(); t->AbortTasksAndClose(); } for (const auto& t : tables) { - VLOG(1) << "Waiting on Aborting tasks for table " << t; + VLOG(1) << "Waiting on Aborting tasks for table " << t->ToString(); t->WaitTasksCompletion(); } VLOG(1) << "Waiting on Aborting tasks done"; diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index d53afecf1c8f..0310c8224b3d 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -52,7 +52,13 @@ #include "yb/gutil/macros.h" #include "yb/gutil/ref_counted.h" #include "yb/gutil/strings/substitute.h" +#include "yb/gutil/thread_annotations.h" +#include "yb/master/async_rpc_tasks.h" +#include "yb/master/catalog_entity_info.h" #include "yb/master/master_defaults.h" +#include "yb/master/permissions_manager.h" +#include "yb/master/sys_catalog_initialization.h" +#include "yb/master/scoped_leader_shared_lock.h" #include "yb/master/ts_descriptor.h" #include "yb/master/ts_manager.h" #include "yb/master/yql_virtual_table.h" @@ -69,11 +75,6 @@ #include "yb/util/status.h" #include "yb/util/test_macros.h" #include "yb/util/version_tracker.h" -#include "yb/gutil/thread_annotations.h" -#include "yb/master/catalog_entity_info.h" -#include "yb/master/scoped_leader_shared_lock.h" -#include "yb/master/permissions_manager.h" -#include "yb/master/sys_catalog_initialization.h" namespace yb { @@ -633,6 +634,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf { friend class SysConfigLoader; friend class ::yb::master::ScopedLeaderSharedLock; friend class PermissionsManager; + friend class MultiStageAlterTable; + friend class BackfillTable; + friend class BackfillTablet; #define CALL_FRIEND_TEST(...) FRIEND_TEST(__VA_ARGS__) CALL_FRIEND_TEST(pgwrapper::PgMiniTest, YB_DISABLE_TEST_IN_TSAN(DropDBMarkDeleted)); @@ -884,6 +888,12 @@ class CatalogManager : public tserver::TabletPeerLookupIf { void SendCreateTabletRequests(const std::vector& tablets); // Send the "alter table request" to all tablets of the specified table. + // + // Also, initiates the required AlterTable requests to backfill the Index. + // Initially the index is set to be in a INDEX_PERM_DELETE_ONLY state, then + // updated to INDEX_PERM_WRITE_AND_DELETE state; followed by backfilling. Once + // all the tablets have completed backfilling, the index will be updated + // to be in INDEX_PERM_READ_WRITE_AND_DELETE state. void SendAlterTableRequest(const scoped_refptr& table); // Start the background task to send the AlterTable() RPC to the leader for this diff --git a/src/yb/master/master.proto b/src/yb/master/master.proto index f6e905026fb6..1261e98ae4b5 100644 --- a/src/yb/master/master.proto +++ b/src/yb/master/master.proto @@ -291,6 +291,10 @@ message SysTabletsEntryPB { // True if the tablet is colocated. optional bool colocated = 9 [ default = false ]; + + // If the tablet is in the backfilling state, this is used to keep track + // of how far along backfill has completed. Encoded like the partition key. + optional bytes backfilled_until = 10; } // The on-disk entry in the sys.catalog table ("metadata" column) for @@ -360,6 +364,13 @@ message SysTablesEntryPB { optional uint32 wal_retention_secs = 24; optional bool colocated = 25 [ default = false ]; // Is this a colocated table? + + // For index backfill/schema related changes. + // When a schema change is going on, we want to make sure that the master returns the "older" + // fully applied version until all the tablets have moved on to the newer version. + optional uint32 fully_applied_schema_version = 23; + repeated IndexInfoPB fully_applied_indexes = 26; + optional IndexInfoPB fully_applied_index_info = 27; } // The data part of a SysRowEntry in the sys.catalog table for a namespace. diff --git a/src/yb/master/sys_catalog.cc b/src/yb/master/sys_catalog.cc index e9bff4d15b4e..36a0e12a4902 100644 --- a/src/yb/master/sys_catalog.cc +++ b/src/yb/master/sys_catalog.cc @@ -724,8 +724,8 @@ Status SysCatalogTable::CopyPgsqlTable(const TableId& source_table_id, const tablet::TableInfo* target_table_info = VERIFY_RESULT(meta->GetTableInfo(target_table_id)); const Schema source_projection = source_table_info->schema.CopyWithoutColumnIds(); - std::unique_ptr iter = - VERIFY_RESULT(tablet->NewRowIterator(source_projection, boost::none, source_table_id)); + std::unique_ptr iter = VERIFY_RESULT( + tablet->NewRowIterator(source_projection, boost::none, {}, source_table_id)); QLTableRow source_row; std::unique_ptr writer = NewWriter(leader_term); while (VERIFY_RESULT(iter->HasNext())) { @@ -762,8 +762,8 @@ Status SysCatalogTable::CopyPgsqlTables( const tablet::TableInfo* target_table_info = VERIFY_RESULT(meta->GetTableInfo(target_table_id)); const Schema source_projection = source_table_info->schema.CopyWithoutColumnIds(); - std::unique_ptr iter = - VERIFY_RESULT(tablet->NewRowIterator(source_projection, boost::none, source_table_id)); + std::unique_ptr iter = VERIFY_RESULT( + tablet->NewRowIterator(source_projection, boost::none, {}, source_table_id)); QLTableRow source_row; int count = 0; while (VERIFY_RESULT(iter->HasNext())) { diff --git a/src/yb/master/ts_manager.cc b/src/yb/master/ts_manager.cc index e68370d182c5..eb4c21259551 100644 --- a/src/yb/master/ts_manager.cc +++ b/src/yb/master/ts_manager.cc @@ -178,7 +178,10 @@ void TSManager::GetDescriptors(std::function condi for (const TSDescriptorMap::value_type& entry : servers_by_id_) { const TSDescriptorPtr& ts = entry.second; if (condition(ts)) { + VLOG(1) << " Adding " << yb::ToString(*ts); descs->push_back(ts); + } else { + VLOG(1) << " NOT Adding " << yb::ToString(*ts); } } } diff --git a/src/yb/server/monitored_task.h b/src/yb/server/monitored_task.h index 566a9a5ce822..74eecdeb4070 100644 --- a/src/yb/server/monitored_task.h +++ b/src/yb/server/monitored_task.h @@ -75,6 +75,9 @@ class MonitoredTask : public std::enable_shared_from_this { ASYNC_FLUSH_TABLETS, ASYNC_ADD_TABLE_TO_TABLET, ASYNC_REMOVE_TABLE_FROM_TABLET, + ASYNC_GET_SAFE_TIME, + ASYNC_BACKFILL_TABLET_CHUNK, + ASYNC_BACKFILL_DONE, }; virtual Type type() const = 0; diff --git a/src/yb/tablet/operations/change_metadata_operation.cc b/src/yb/tablet/operations/change_metadata_operation.cc index b518e8519ae5..e325f4774df4 100644 --- a/src/yb/tablet/operations/change_metadata_operation.cc +++ b/src/yb/tablet/operations/change_metadata_operation.cc @@ -141,6 +141,7 @@ Status ChangeMetadataOperation::DoReplicated(int64_t leader_term, Status* comple WAL_RETENTION_SECS, ADD_TABLE, REMOVE_TABLE, + BACKFILL_DONE, }; MetadataChange metadata_change = MetadataChange::NONE; @@ -174,6 +175,13 @@ Status ChangeMetadataOperation::DoReplicated(int64_t leader_term, Status* comple } } + if (state()->request()->has_is_backfilling()) { + metadata_change = MetadataChange::NONE; + if (++num_operations == 1) { + metadata_change = MetadataChange::BACKFILL_DONE; + } + } + switch (metadata_change) { case MetadataChange::NONE: return STATUS_FORMAT( @@ -208,6 +216,10 @@ Status ChangeMetadataOperation::DoReplicated(int64_t leader_term, Status* comple << num_operations; RETURN_NOT_OK(tablet->RemoveTable(state()->request()->remove_table_id())); break; + case MetadataChange::BACKFILL_DONE: + DCHECK_EQ(1, num_operations) << "Invalid number of alter operations: " << num_operations; + RETURN_NOT_OK(tablet->MarkBackfillDone(state()->request()->is_backfilling())); + break; } // The schema lock was acquired by Tablet::CreatePreparedChangeMetadata. diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 1420d209e53e..5d7d7bd7b5dd 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -359,6 +359,8 @@ Tablet::Tablet( is_sys_catalog_(is_sys_catalog), txns_enabled_(txns_enabled) { CHECK(schema()->has_column_ids()); + LOG_WITH_PREFIX(INFO) << " Schema version for " << metadata_->table_name() << " is " + << metadata_->schema_version(); if (metric_registry) { MetricEntity::AttributeMap attrs; @@ -805,7 +807,9 @@ Status Tablet::ResetRocksDBs(bool destroy) { } Result> Tablet::NewRowIterator( - const Schema &projection, const boost::optional& transaction_id, + const Schema &projection, + const boost::optional& transaction_id, + const ReadHybridTime read_hybrid_time, const TableId& table_id) const { if (state_ != kOpen) { return STATUS_FORMAT(IllegalState, "Tablet in wrong state: $0", state_); @@ -827,7 +831,9 @@ Result> Tablet::NewRowIterator( auto txn_op_ctx = CreateTransactionOperationContext( transaction_id, schema.table_properties().is_ysql_catalog_table()); - auto read_time = ReadHybridTime::SingleTime(SafeTime(RequireLease::kFalse)); + const auto read_time = + (read_hybrid_time ? read_hybrid_time + : ReadHybridTime::SingleTime(SafeTime(RequireLease::kFalse))); auto result = std::make_unique( std::move(mapped_projection), schema, txn_op_ctx, doc_db(), CoarseTimePoint::max() /* deadline */, read_time, &pending_op_counter_); @@ -838,7 +844,7 @@ Result> Tablet::NewRowIterator( Result> Tablet::NewRowIterator( const TableId& table_id) const { const tablet::TableInfo* table_info = VERIFY_RESULT(metadata_->GetTableInfo(table_id)); - return NewRowIterator(table_info->schema, boost::none, table_id); + return NewRowIterator(table_info->schema, boost::none, {}, table_id); } void Tablet::StartOperation(WriteOperationState* operation_state) { @@ -1639,6 +1645,17 @@ Status Tablet::RemoveTable(const std::string& table_id) { return Status::OK(); } +Status Tablet::MarkBackfillDone(bool done) { + LOG_WITH_PREFIX(INFO) << "Setting backfill as done. Current schema " + << metadata_->schema().ToString(); + const vector empty_deleted_cols; + Schema new_schema = metadata_->schema(); + new_schema.SetIsBackfilling(done); + metadata_->SetSchema( + new_schema, metadata_->index_map(), empty_deleted_cols, metadata_->schema_version()); + return metadata_->Flush(); +} + Status Tablet::AlterSchema(ChangeMetadataOperationState *operation_state) { DCHECK(key_schema_.KeyEquals(*DCHECK_NOTNULL(operation_state->schema()))) << "Schema keys cannot be altered"; @@ -1705,6 +1722,19 @@ Status Tablet::AlterWalRetentionSecs(ChangeMetadataOperationState* operation_sta operation_state->ToString()); } +bool Tablet::ShouldRetainDeleteMarkersInMajorCompaction() const { + // If the index table is in the process of being backfilled, then we + // want to retain delete markers until the backfill process is complete. + return !schema()->table_properties().IsBackfilling(); +} + +// Should backfill the indexes for the records contained in this tablet. +// Assume that we are already in the Backfilling mode. +Status Tablet::BackfillIndexes(const std::vector &indexes, + HybridTime read_time) { + return Status::OK(); +} + ScopedPendingOperationPause Tablet::PauseReadWriteOperations() { LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("Tablet $0: Waiting for pending ops to complete", tablet_id())) { diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index bf0c14e9e3b7..30c51badc752 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -210,6 +210,11 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { CHECKED_STATUS EnableCompactions(); + CHECKED_STATUS BackfillIndexes(const std::vector &indexes, + HybridTime read_time); + + bool ShouldRetainDeleteMarkersInMajorCompaction() const; + // Mark that the tablet has finished bootstrapping. // This transitions from kBootstrapping to kOpen state. void MarkFinishedBootstrapping(); @@ -329,7 +334,9 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { // state of this tablet. // The returned iterator is not initialized. Result> NewRowIterator( - const Schema &projection, const boost::optional& transaction_id, + const Schema &projection, + const boost::optional& transaction_id, + const ReadHybridTime read_hybrid_time = {}, const TableId& table_id = "") const; Result> NewRowIterator( const TableId& table_id) const; @@ -352,6 +359,10 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { // Apply the Schema of the specified operation. CHECKED_STATUS AlterSchema(ChangeMetadataOperationState* operation_state); + // Used to update the tablets on the index table that the index has been backfilled. + // This means that major compactions can now garbage collect delete markers. + CHECKED_STATUS MarkBackfillDone(bool done); + // Change wal_retention_secs in the metadata. CHECKED_STATUS AlterWalRetentionSecs(ChangeMetadataOperationState* operation_state); diff --git a/src/yb/tablet/tablet_metadata.cc b/src/yb/tablet/tablet_metadata.cc index ec4344d1d619..36d130fa7f02 100644 --- a/src/yb/tablet/tablet_metadata.cc +++ b/src/yb/tablet/tablet_metadata.cc @@ -633,6 +633,9 @@ void RaftGroupMetadata::SetSchema(const Schema& schema, index_map, deleted_cols, version)); + VLOG_WITH_PREFIX(1) << raft_group_id_ << " Updating to Schema version " << version + << " from \n" << yb::ToString(kv_store_.tables[primary_table_id_]) + << " to \n" << yb::ToString(new_table_info); kv_store_.tables[primary_table_id_].swap(new_table_info); if (new_table_info) { kv_store_.old_tables.push_back(std::move(new_table_info)); @@ -689,6 +692,9 @@ void RaftGroupMetadata::AddTable(const std::string& table_id, << new_table_info->ToString() << ", old table info: " << existing_table.ToString(); } } + VLOG_WITH_PREFIX(1) << " Updating to Schema version " << schema_version + << " from \n" << yb::ToString(tables[table_id]) + << " to \n" << yb::ToString(new_table_info); tables[table_id].swap(new_table_info); } diff --git a/src/yb/tablet/tablet_retention_policy.cc b/src/yb/tablet/tablet_retention_policy.cc index ce5d033f208b..603e6a09b49a 100644 --- a/src/yb/tablet/tablet_retention_policy.cc +++ b/src/yb/tablet/tablet_retention_policy.cc @@ -50,11 +50,10 @@ HistoryRetentionDirective TabletRetentionPolicy::GetRetentionDirective() { } } - return { - history_cutoff, - std::move(deleted_before_history_cutoff), - TableTTL(tablet_->metadata()->schema()) - }; + return {history_cutoff, std::move(deleted_before_history_cutoff), + TableTTL(tablet_->metadata()->schema()), + docdb::ShouldRetainDeleteMarkersInMajorCompaction( + tablet_->ShouldRetainDeleteMarkersInMajorCompaction())}; } } // namespace tablet diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index fb6b4708ca80..09bbd083f6e2 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -131,6 +131,9 @@ DEFINE_int32(max_wait_for_safe_time_ms, 5000, "Maximum time in milliseconds to wait for the safe time to advance when trying to " "scan at the given hybrid_time."); +DEFINE_int32(num_concurrent_backfills_allowed, 8, + "Maximum number of concurrent backfill jobs that is allowed to run."); + DEFINE_test_flag(bool, tserver_noop_read_write, false, "Respond NOOP to read/write."); DEFINE_int32(max_stale_read_bound_time_ms, 0, "If we are allowed to read from followers, " @@ -348,6 +351,7 @@ class WriteOperationCompletionCallback : public OperationCompletionCallback { state_(state), clock_(clock), include_trace_(trace) {} void OperationCompleted() override { + VLOG(1) << __PRETTY_FUNCTION__ << "completing with status " << status_; // When we don't need to return any data, we could return success on duplicate request. if (status_.IsAlreadyPresent() && state_->ql_write_ops()->empty() && @@ -400,6 +404,7 @@ class WriteOperationCompletionCallback : public OperationCompletionCallback { } response_->set_propagated_hybrid_time(clock_->Now().ToUint64()); context_->RespondSuccess(); + VLOG(1) << __PRETTY_FUNCTION__ << " RespondedSuccess"; } private: @@ -458,8 +463,150 @@ TabletServiceImpl::TabletServiceImpl(TabletServerIf* server) } TabletServiceAdminImpl::TabletServiceAdminImpl(TabletServer* server) - : TabletServerAdminServiceIf(server->MetricEnt()), - server_(server) { + : TabletServerAdminServiceIf(server->MetricEnt()), server_(server) {} + +void TabletServiceAdminImpl::BackfillDone( + const ChangeMetadataRequestPB* req, ChangeMetadataResponsePB* resp, rpc::RpcContext context) { + if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillDone", req, resp, &context)) { + return; + } + DVLOG(3) << "Received BackfillDone RPC: " << req->DebugString(); + + server::UpdateClock(*req, server_->Clock()); + + // For now, we shall only allow this RPC on the leader. + auto tablet = + LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); + if (!tablet) { + return; + } + + auto operation_state = std::make_unique( + tablet.peer->tablet(), tablet.peer->log(), req); + + operation_state->set_completion_callback( + MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock())); + + // Submit the alter schema op. The RPC will be responded to asynchronously. + tablet.peer->Submit( + std::make_unique(std::move(operation_state)), + tablet.leader_term); +} + +void TabletServiceAdminImpl::GetSafeTime( + const GetSafeTimeRequestPB* req, GetSafeTimeResponsePB* resp, rpc::RpcContext context) { + if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "GetSafeTime", req, resp, &context)) { + return; + } + DVLOG(3) << "Received GetSafeTime RPC: " << req->DebugString(); + + server::UpdateClock(*req, server_->Clock()); + + // For now, we shall only allow this RPC on the leader. + auto tablet = + LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); + if (!tablet) { + return; + } + + HybridTime safe_time = tablet.peer->tablet()->SafeTime(); + resp->set_safe_time(safe_time.ToUint64()); + resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); + DVLOG(3) << "Tablet " << tablet.peer->tablet_id() + << ". returning SafeTime for : " << yb::ToString(safe_time); + + context.RespondSuccess(); +} + +void TabletServiceAdminImpl::BackfillIndex( + const BackfillIndexRequestPB* req, BackfillIndexResponsePB* resp, rpc::RpcContext context) { + if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillIndex", req, resp, &context)) { + return; + } + DVLOG(3) << "Received BackfillIndex RPC: " << req->DebugString(); + + server::UpdateClock(*req, server_->Clock()); + + // For now, we shall only allow this RPC on the leader. + auto tablet = + LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); + if (!tablet) { + return; + } + + uint32_t schema_version = tablet.peer->tablet_metadata()->schema_version(); + // If the current schema is newer than the one in the request reject the request. + if (schema_version != req->schema_version()) { + if (schema_version == 1 + req->schema_version()) { + LOG(WARNING) << "Received BackfillIndex RPC: " << req->DebugString() + << " after we have moved to schema_version = " << schema_version; + // This is possible if this tablet completed the backfill. But the master failed over before + // other tablets could complete. + // The new master is redoing the backfill. We are safe to ignore this request. + context.RespondSuccess(); + } else { + SetupErrorAndRespond( + resp->mutable_error(), STATUS_SUBSTITUTE( + InvalidArgument, "Tablet has a different schema $0 vs $1", + schema_version, req->schema_version()), + TabletServerErrorPB::MISMATCHED_SCHEMA, &context); + } + return; + } + + DVLOG(1) << "Received Backfill Index RPC: " << req->DebugString(); + const auto coarse_start = CoarseMonoClock::Now(); + { + std::unique_lock l(backfill_lock_); + while (num_tablets_backfilling_ >= FLAGS_num_concurrent_backfills_allowed) { + backfill_cond_.wait(l); + } + num_tablets_backfilling_++; + } + auto se = ScopeExit([this] { + std::unique_lock l(this->backfill_lock_); + this->num_tablets_backfilling_--; + this->backfill_cond_.notify_all(); + }); + + const auto coarse_now = CoarseMonoClock::Now(); + const CoarseTimePoint& deadline = context.GetClientDeadline(); + // Don't work on the request if we have had to wait more than 50% + // of the time allocated to us for the RPC. + // Backfill is a costly operation, we do not want to start working + // on it if we expect the client (master) to time out the RPC and + // force us to redo the work. + if (deadline - coarse_now < coarse_now - coarse_start) { + SetupErrorAndRespond( + resp->mutable_error(), + STATUS_SUBSTITUTE( + ServiceUnavailable, "Already running $0 backfill requests.", + FLAGS_num_concurrent_backfills_allowed), + TabletServerErrorPB::UNKNOWN_ERROR, &context); + return; + } + + const IndexMap& index_map = tablet.peer->tablet_metadata()->index_map(); + std::vector indices_to_backfill; + std::vector index_ids; + for (const auto& idx : req->indexes()) { + indices_to_backfill.push_back(index_map.at(idx.table_id())); + index_ids.push_back(index_map.at(idx.table_id()).table_id()); + } + Status s = tablet.peer->tablet()->BackfillIndexes( + indices_to_backfill, HybridTime(req->read_at_hybrid_time())); + DVLOG(1) << "Tablet " << tablet.peer->tablet_id() + << ". Backfilled indices for : " << yb::ToString(index_ids) << " with status " << s; + if (!s.ok()) { + SetupErrorAndRespond( + resp->mutable_error(), s, (s.IsIllegalState() ? TabletServerErrorPB::OPERATION_NOT_SUPPORTED + : TabletServerErrorPB::UNKNOWN_ERROR), + &context); + return; + } + + resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); + context.RespondSuccess(); } void TabletServiceAdminImpl::AlterSchema(const ChangeMetadataRequestPB* req, @@ -468,7 +615,7 @@ void TabletServiceAdminImpl::AlterSchema(const ChangeMetadataRequestPB* req, if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "ChangeMetadata", req, resp, &context)) { return; } - DVLOG(3) << "Received Change Metadata RPC: " << req->DebugString(); + DVLOG(1) << "Received Change Metadata RPC: " << req->DebugString(); server::UpdateClock(*req, server_->Clock()); @@ -478,21 +625,20 @@ void TabletServiceAdminImpl::AlterSchema(const ChangeMetadataRequestPB* req, return; } + Schema tablet_schema = tablet.peer->tablet_metadata()->schema(); uint32_t schema_version = tablet.peer->tablet_metadata()->schema_version(); + // Sanity check, to verify that the tablet should have the same schema + // specified in the request. + Schema req_schema; + Status s = SchemaFromPB(req->schema(), &req_schema); + if (!s.ok()) { + SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::INVALID_SCHEMA, &context); + return; + } // If the schema was already applied, respond as succeeded. if (!req->has_wal_retention_secs() && schema_version == req->schema_version()) { - // Sanity check, to verify that the tablet should have the same schema - // specified in the request. - Schema req_schema; - Status s = SchemaFromPB(req->schema(), &req_schema); - if (!s.ok()) { - SetupErrorAndRespond(resp->mutable_error(), s, - TabletServerErrorPB::INVALID_SCHEMA, &context); - return; - } - Schema tablet_schema = tablet.peer->tablet_metadata()->schema(); if (req_schema.Equals(tablet_schema)) { context.RespondSuccess(); return; @@ -514,12 +660,23 @@ void TabletServiceAdminImpl::AlterSchema(const ChangeMetadataRequestPB* req, // If the current schema is newer than the one in the request reject the request. if (schema_version > req->schema_version()) { - SetupErrorAndRespond(resp->mutable_error(), - STATUS(InvalidArgument, "Tablet has a newer schema"), - TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, &context); + LOG(ERROR) << "Tablet " << req->tablet_id() << " has a newer schema " + << " version=" << schema_version + << " req->schema_version()=" << req->schema_version() + << "\n current-schema=" << tablet_schema.ToString() + << "\n request-schema=" << req_schema.ToString() << " (wtf?)"; + SetupErrorAndRespond( + resp->mutable_error(), + STATUS_SUBSTITUTE( + InvalidArgument, "Tablet has a newer schema Tab $0. Req $1 vs Existing version : $2", + req->tablet_id(), req->DebugString(), schema_version), + TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, &context); return; } + VLOG(1) << "Tablet updating schema from " + << " version=" << schema_version << " current-schema=" << tablet_schema.ToString() + << " to request-schema=" << req_schema.ToString(); auto operation_state = std::make_unique( tablet.peer->tablet(), tablet.peer->log(), req); diff --git a/src/yb/tserver/tablet_service.h b/src/yb/tserver/tablet_service.h index e8e6ec74de8a..009fb7f883e8 100644 --- a/src/yb/tserver/tablet_service.h +++ b/src/yb/tserver/tablet_service.h @@ -207,8 +207,30 @@ class TabletServiceAdminImpl : public TabletServerAdminServiceIf { RemoveTableFromTabletResponsePB* resp, rpc::RpcContext context) override; + // Called on the Indexed table to choose time to read. + void GetSafeTime( + const GetSafeTimeRequestPB* req, GetSafeTimeResponsePB* resp, + rpc::RpcContext context) override; + + // Called on the Indexed table to backfill the index table(s). + void BackfillIndex( + const BackfillIndexRequestPB* req, BackfillIndexResponsePB* resp, + rpc::RpcContext context) override; + + // Called on the Index table(s) once the backfill is complete. + void BackfillDone( + const ChangeMetadataRequestPB* req, ChangeMetadataResponsePB* resp, + rpc::RpcContext context) override; + private: TabletServer* server_; + + // Used to implement wait/signal mechanism for backfill requests. + // Since the number of concurrently allowed backfill requests is + // limited. + mutable std::mutex backfill_lock_; + std::condition_variable backfill_cond_; + std::atomic num_tablets_backfilling_{0}; }; class ConsensusServiceImpl : public consensus::ConsensusServiceIf { diff --git a/src/yb/tserver/tserver_admin.proto b/src/yb/tserver/tserver_admin.proto index e4db0b497b93..ceef3c376043 100644 --- a/src/yb/tserver/tserver_admin.proto +++ b/src/yb/tserver/tserver_admin.proto @@ -62,6 +62,8 @@ message ChangeMetadataRequestPB { optional uint32 wal_retention_secs = 9; optional bytes remove_table_id = 10; + + optional bool is_backfilling = 11; } // This is used to export tablet metadata changes to a protobuf file to be reloaded on a new cluster @@ -76,6 +78,49 @@ message ChangeMetadataResponsePB { optional fixed64 propagated_hybrid_time = 2; } +message GetSafeTimeRequestPB { + // UUID of server this request is addressed to. + optional bytes dest_uuid = 1; + + required bytes tablet_id = 2; + + optional uint32 schema_version = 3; + + optional fixed64 propagated_hybrid_time = 4; +} + +message GetSafeTimeResponsePB { + optional TabletServerErrorPB error = 1; + + optional fixed64 safe_time = 2; + + optional fixed64 propagated_hybrid_time = 3; +} + +message BackfillIndexRequestPB { + // UUID of server this request is addressed to. + optional bytes dest_uuid = 1; + + required bytes tablet_id = 2; + + repeated IndexInfoPB indexes = 3; + + optional uint32 schema_version = 4; + + optional fixed64 read_at_hybrid_time = 5; + + optional bytes start_key = 6; + optional bytes end_key = 7; + + optional fixed64 propagated_hybrid_time = 8; +} + +message BackfillIndexResponsePB { + optional TabletServerErrorPB error = 1; + + optional fixed64 propagated_hybrid_time = 2; +} + message CopartitionTableRequestPB { // UUID of server this request is addressed to. optional bytes dest_uuid = 1; @@ -233,6 +278,16 @@ service TabletServerAdminService { // Alter a tablet's schema. rpc AlterSchema(ChangeMetadataRequestPB) returns (ChangeMetadataResponsePB); + // GetSafeTime API to get the current safe time. + rpc GetSafeTime(GetSafeTimeRequestPB) returns (GetSafeTimeResponsePB); + + // Backfill the index for the specified index tables. Addressed to the indexed + // table. + rpc BackfillIndex(BackfillIndexRequestPB) returns (BackfillIndexResponsePB); + + // Marks an index table as having completed backfilling. + rpc BackfillDone(ChangeMetadataRequestPB) returns (ChangeMetadataResponsePB); + // Create a co-partitioned table in an existing tablet rpc CopartitionTable(CopartitionTableRequestPB) returns (CopartitionTableResponsePB); diff --git a/src/yb/yql/cql/ql/exec/executor.cc b/src/yb/yql/cql/ql/exec/executor.cc index 0aa4627fee02..1ec4346e9ffd 100644 --- a/src/yb/yql/cql/ql/exec/executor.cc +++ b/src/yb/yql/cql/ql/exec/executor.cc @@ -2053,9 +2053,18 @@ Status Executor::AddIndexWriteOps(const PTDmlStmt *tnode, } // Create the write operation for each index and populate it using the original operation. + // CQL does not allow the primary key to be updated, so PK-only index rows will be either + // deleted when the row in the main table is deleted, or it will be inserted into the index + // when a row is inserted into the main table or updated (for a non-pk column). for (const auto& index_table : tnode->pk_only_indexes()) { const IndexInfo* index = VERIFY_RESULT(tnode->table()->index_map().FindIndex(index_table->id())); + const bool index_ready_to_accept = (is_upsert ? index->AllowWrites() : index->AllowDelete()); + if (!index_ready_to_accept) { + // We are in the process of backfilling the index. It should not be updated with a + // write/delete yet. The backfill stage will update the index for such entries. + continue; + } YBqlWriteOpPtr index_op(is_upsert ? index_table->NewQLInsert() : index_table->NewQLDelete()); index_op->set_writes_primary_row(true); QLWriteRequestPB *index_req = index_op->mutable_request(); diff --git a/src/yb/yql/cql/ql/ptree/pt_select.cc b/src/yb/yql/cql/ql/ptree/pt_select.cc index 49a22088054a..f27aaac7d12d 100644 --- a/src/yb/yql/cql/ql/ptree/pt_select.cc +++ b/src/yb/yql/cql/ql/ptree/pt_select.cc @@ -477,7 +477,9 @@ CHECKED_STATUS PTSelectStmt::AnalyzeIndexes(SemContext *sem_context) { selectivities.reserve(table_->index_map().size() + 1); selectivities.emplace_back(sem_context->PTempMem(), *this); for (const std::pair& index : table_->index_map()) { - selectivities.emplace_back(sem_context->PTempMem(), *this, index.second); + if (index.second.AllowReads()) { + selectivities.emplace_back(sem_context->PTempMem(), *this, index.second); + } } std::sort(selectivities.begin(), selectivities.end(), std::greater()); if (VLOG_IS_ON(3)) {