Skip to content

Commit

Permalink
[PLAT-14835]: Add extra transient YCQL index tables in xClusterTableC…
Browse files Browse the repository at this point in the history
…onfig during GET calls and metrics.

Summary:
 - Implemented changes to add extra ycql index present on either source or target universe that should be in replication but are not added yet.
 - Fixed the duplication issue for the table configs due to intermittent table configs. Now, we will notice duplication only when we have 2 new tables that are not part of replication but present independently on the source and target universe.

Test Plan:
Tested the above changes through the following steps:
 - Create replication for the YCQL table with no index table and then verify that transient table configs are present with the desired status if they are present on either source and target universe
 - Verified that we do not duplicate table configs when we drop/add a new table on either universe due to newly transient table configs.

Reviewers: hzare, jmak

Reviewed By: hzare

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D37041
  • Loading branch information
vipul-yb committed Aug 12, 2024
1 parent 116f846 commit bd1e19e
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,24 @@ public static Map<String, String> getSourceTableIdTargetTableIdMap(
return sourceTableIdTargetTableIdMap;
}

/**
* Returns a map that maps target table IDs to source table IDs.
*
* @param sourceTableInfoList A list of source table information.
* @param targetTableInfoList A list of target table information.
* @return A map that maps target table IDs to source table IDs.
*/
public static Map<String, String> getTargetTableIdToSourceTableIdMap(
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> sourceTableInfoList,
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> targetTableInfoList) {
log.debug(
"switching the sequence of source and target tables in order to achieve the"
+ " targetTableIdToSourceTableIdMap");
Map<String, String> targetTableIdToSourceTableIdMap =
getSourceTableIdTargetTableIdMap(targetTableInfoList, sourceTableInfoList);
return targetTableIdToSourceTableIdMap;
}

/**
* It assumes table names in both {@code tableInfoListOnSource} and {@code tableInfoListOnTarget}
* are unique except for colocated parent table names.
Expand Down Expand Up @@ -1339,6 +1357,16 @@ public static Map<String, GetTableSchemaResponse> getTableSchemas(
return tableSchemaMap;
}

/**
* Checks if the table info will contain an indexed table ID.
*
* @param ybSoftwareVersion The version of the YB software.
* @return True if the table info contains an indexed table ID, false otherwise.
*/
public static boolean universeTableInfoContainsIndexedTableId(String ybSoftwareVersion) {
return Util.compareYbVersions("2.21.1.0-b168", ybSoftwareVersion, true) <= 0;
}

/**
* It returns a map from main table id to a list of index table ids associated with the main table
* in the universe. `getIndexedTableId` was added to YBDB 2.21.1.0-b168.
Expand All @@ -1351,11 +1379,8 @@ public static Map<String, List<String>> getMainTableIndexTablesMap(
Collection<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> tableInfoList) {
// Ensure it is not called for a universe older than 2.21.1.0-b168. For older version use the
// other getMainTableIndexTablesMap method that uses an RPC available in older universes.
if (Util.compareYbVersions(
"2.21.1.0-b168",
universe.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion,
true)
> 0) {
if (!universeTableInfoContainsIndexedTableId(
universe.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion)) {
throw new IllegalStateException(
"This method is only supported for universes newer than or equal to 2.21.1.0-b168");
}
Expand Down Expand Up @@ -1783,9 +1808,8 @@ public static void updateReplicationDetailsFromDB(
}

try {
if (xClusterConfig.getTableType().equals(XClusterConfig.TableType.YSQL)
&& Arrays.asList(XClusterConfigStatusType.Running, XClusterConfigStatusType.Updating)
.contains(xClusterConfig.getStatus())) {
if (Arrays.asList(XClusterConfigStatusType.Running, XClusterConfigStatusType.Updating)
.contains(xClusterConfig.getStatus())) {
addTransientTableConfigs(
xClusterConfig,
xClusterUniverseService,
Expand Down Expand Up @@ -2015,6 +2039,8 @@ private static void addTransientTableConfigs(
.map(tableInfo -> XClusterConfigTaskBase.getTableId(tableInfo))
.collect(Collectors.toSet());

// Update the status for tables that were previously being replicated but have been dropped from
// the source.
xClusterConfig.getTableDetails().stream()
.filter(tableConfig -> tableConfig.getStatus() == XClusterTableConfig.Status.Running)
.forEach(
Expand All @@ -2024,6 +2050,9 @@ private static void addTransientTableConfigs(
}
});

// Note: Tables dropped from target are already updated with status
// "UnableToFetch" as part of setReplicationStatus function.

Pair<List<XClusterTableConfig>, List<XClusterTableConfig>> tableConfigs =
getXClusterTableConfigNotInReplication(
xClusterUniverseService,
Expand All @@ -2037,15 +2066,55 @@ private static void addTransientTableConfigs(
.getFirst()
.forEach(
tableConfig -> {
xClusterConfig.addTableConfig(tableConfig);
Optional<XClusterTableConfig> existingTableConfig =
xClusterConfig.getTableDetails().stream()
.filter(t -> t.getTableId().equals(tableConfig.getTableId()))
.findFirst();
if (!existingTableConfig.isPresent()) {
xClusterConfig.addTableConfig(tableConfig);
} else {
log.info(
"Found table {} with status {} on source universe but already exists in"
+ " xCluster config in YBA",
tableConfig.getTableId(),
tableConfig.getStatus());
}
});

tableConfigs
.getSecond()
.forEach(
tableConfig -> {
xClusterConfig.addTableConfig(tableConfig);
});
if (tableConfigs.getSecond().size() > 0) {
Map<String, String> targetTableIdToSourceTableIdMap =
getTargetTableIdToSourceTableIdMap(sourceTableInfoList, targetTableInfoList);

tableConfigs
.getSecond()
.forEach(
tableConfig -> {
String targetTableId = tableConfig.getTableId();
String sourceTableId = targetTableIdToSourceTableIdMap.get(targetTableId);
if (sourceTableId != null) {
Optional<XClusterTableConfig> existingTableConfig =
xClusterConfig.getTableDetails().stream()
.filter(t -> t.getTableId().equals(sourceTableId))
.findFirst();
if (!existingTableConfig.isPresent()
|| existingTableConfig
.get()
.getStatus()
.equals(XClusterTableConfig.Status.ExtraTableOnSource)) {
xClusterConfig.addTableConfig(tableConfig);
} else {
log.info(
"Found table target: {} source: {} with status {} on target universe but"
+ " already exists in xCluster config in YBA",
targetTableId,
sourceTableId,
tableConfig.getStatus());
}
} else {
xClusterConfig.addTableConfig(tableConfig);
}
});
}
}

/**
Expand Down Expand Up @@ -2150,9 +2219,6 @@ private static void addSourceAndTargetTableInfo(
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> targetTableInfoList) {
List<XClusterTableConfig> targetTableConfigs = new ArrayList<>();
List<XClusterTableConfig> sourceTableConfigs = new ArrayList<>();
if (!xClusterConfig.getTableType().equals(XClusterConfig.TableType.YSQL)) {
return new Pair<>(sourceTableConfigs, targetTableConfigs);
}

targetTableConfigs =
getTargetOnlyTable(
Expand All @@ -2178,13 +2244,15 @@ public static List<XClusterTableConfig> getTargetOnlyTable(
XClusterConfig xClusterConfig,
CatalogEntityInfo.SysClusterConfigEntryPB clusterConfig,
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> targetTableInfoList) {

try {
Set<String> tableIdsInReplicationOnTargetUniverse =
getConsumerTableIdsFromClusterConfig(
clusterConfig, xClusterConfig.getReplicationGroupName());

return extractTablesNotInReplication(
xClusterConfig,
xClusterConfig.getTargetUniverseUUID(),
ybClientService,
tableIdsInReplicationOnTargetUniverse,
targetTableInfoList,
XClusterTableConfig.Status.ExtraTableOnTarget);
Expand All @@ -2208,6 +2276,9 @@ public static List<XClusterTableConfig> getSourceOnlyTable(
clusterConfig, xClusterConfig.getReplicationGroupName());

return extractTablesNotInReplication(
xClusterConfig,
xClusterConfig.getSourceUniverseUUID(),
ybClientService,
tableIdsInReplicationOnSourceUniverse,
sourceTableInfoList,
XClusterTableConfig.Status.ExtraTableOnSource);
Expand All @@ -2219,23 +2290,160 @@ public static List<XClusterTableConfig> getSourceOnlyTable(
}

public static List<XClusterTableConfig> extractTablesNotInReplication(
XClusterConfig xClusterConfig,
UUID universeUUID,
YBClientService ybClientService,
Set<String> tablesIdsInReplication,
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> allTables,
XClusterTableConfig.Status missingTableStatus) {

Set<String> namespaceIdsInReplication =
allTables.stream()
.filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo)))
.map(tableInfo -> tableInfo.getNamespace().getId().toStringUtf8())
.collect(Collectors.toSet());
Set<String> tableIdsNotInReplication = new HashSet<>();

Set<String> tableIdsNotInReplication =
allTables.stream()
.filter(tableInfo -> namespaceIdsInReplication.contains(getNamespaceId(tableInfo)))
.filter(tableInfo -> isXClusterSupported(tableInfo))
.filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo)))
.map(tableInfo -> getTableId(tableInfo))
.collect(Collectors.toSet());
if (xClusterConfig.getTableType().equals(XClusterConfig.TableType.YSQL)) {
// Gather all namespaceIds for tables that are in replication.
Set<String> namespaceIdsInReplication =
allTables.stream()
.filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo)))
.map(tableInfo -> tableInfo.getNamespace().getId().toStringUtf8())
.collect(Collectors.toSet());

// All the tables that is xCluster supported and belong to the namespace that is in
// replication but not in the tablesIdsInReplication are the tables that are not in
// replication.
tableIdsNotInReplication =
allTables.stream()
.filter(tableInfo -> namespaceIdsInReplication.contains(getNamespaceId(tableInfo)))
.filter(tableInfo -> isXClusterSupported(tableInfo))
.filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo)))
.map(tableInfo -> getTableId(tableInfo))
.collect(Collectors.toSet());
} else if (xClusterConfig.getTableType().equals(XClusterConfig.TableType.YCQL)) {
Universe universe = Universe.getOrBadRequest(universeUUID);
// On old universes, tableInfo does not contain the field indexedTableId, so we need to gather
// index table ids using tableSchemas.
if (universeTableInfoContainsIndexedTableId(
universe.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion)) {
// Gather all index table ids not in replication by parsing each tables and checking if it's
// indexedTableId is in replication.
tableIdsNotInReplication =
allTables.stream()
.filter(
tableInfo ->
tableInfo.getRelationType().equals(RelationType.INDEX_TABLE_RELATION))
.filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo)))
.filter(
tableInfo ->
tableInfo.getIndexedTableId() != null
&& tablesIdsInReplication.contains(tableInfo.getIndexedTableId()))
.map(tableInfo -> getTableId(tableInfo))
.collect(Collectors.toSet());

// Extract main table ids for tables that are not in replication but their index tables are
// in replication.
Set<String> mainTableIdsNotInReplicationWithIndexInReplication =
allTables.stream()
.filter(tableInfo -> TableInfoUtil.isIndexTable(tableInfo))
.filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo)))
.filter(
tableInfo -> !tablesIdsInReplication.contains(tableInfo.getIndexedTableId()))
.map(tableInfo -> tableInfo.getIndexedTableId())
.collect(Collectors.toSet());

if (mainTableIdsNotInReplicationWithIndexInReplication.size() > 0) {
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo>
mainTableIdsNotInReplicationWithIndexInReplicationInfo =
allTables.stream()
.filter(
tableInfo ->
mainTableIdsNotInReplicationWithIndexInReplication.contains(
getTableId(tableInfo)))
.collect(Collectors.toList());

Map<String, List<String>> mainTableIdToIndexTableIdsNotInReplicationMap =
getMainTableIndexTablesMap(
universe, mainTableIdsNotInReplicationWithIndexInReplicationInfo);

// Add main table ids that are not in replication and its indexes which are not
// in replication.
for (Map.Entry<String, List<String>> entry :
mainTableIdToIndexTableIdsNotInReplicationMap.entrySet()) {
boolean indexTableInReplication =
entry.getValue().stream()
.anyMatch(indexTableId -> tablesIdsInReplication.contains(indexTableId));
if (indexTableInReplication) {
tableIdsNotInReplication.add(entry.getKey());
for (String indexTableId : entry.getValue()) {
if (!tablesIdsInReplication.contains(indexTableId)) {
tableIdsNotInReplication.add(indexTableId);
}
}
}
}
}

} else {
// Gather all main table ids for tables that are in replication.
Set<String> mainTableIdsInReplication =
allTables.stream()
.filter(tableInfo -> !TableInfoUtil.isIndexTable(tableInfo))
.filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo)))
.map(tableInfo -> getTableId(tableInfo))
.collect(Collectors.toSet());

// Index table ids which has its main table in replication.
Set<String> indexTableIdsWithMainTableIdsInReplication = new HashSet<>();

// Gather all index table ids for tables that are in replication and add index table
// ids that are not in replication.
Map<String, List<String>> mainTableIdToIndexTableIdsInReplicationMap =
getMainTableIndexTablesMap(ybClientService, universe, mainTableIdsInReplication);
for (List<String> indexTableIdsList : mainTableIdToIndexTableIdsInReplicationMap.values()) {
for (String indexTableId : indexTableIdsList) {
if (!tablesIdsInReplication.contains(indexTableId)) {
tableIdsNotInReplication.add(indexTableId);
}
indexTableIdsWithMainTableIdsInReplication.add(indexTableId);
}
}

// Index tables that are in replication but their main tables are not in replication.
Set<String> indexTableIdsInReplicationWithMissingMainTable =
tablesIdsInReplication.stream()
.filter(tableId -> !indexTableIdsWithMainTableIdsInReplication.contains(tableId))
.collect(Collectors.toSet());
if (indexTableIdsInReplicationWithMissingMainTable.size() > 0) {
// All main tables in a universe which are not in replication.
Set<String> mainTableIdsNotInReplication =
allTables.stream()
.filter(tableInfo -> !TableInfoUtil.isIndexTable(tableInfo))
.filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo)))
.map(tableInfo -> getTableId(tableInfo))
.collect(Collectors.toSet());
Map<String, List<String>> mainTableIdToIndexTableIdsNotInReplicationMap =
getMainTableIndexTablesMap(ybClientService, universe, mainTableIdsNotInReplication);
// Search through the indexes of each main table not in replication and if found,
// add the main table and index table to the list of tables not in replication
// if they are not already in replication.
for (Map.Entry<String, List<String>> entry :
mainTableIdToIndexTableIdsNotInReplicationMap.entrySet()) {
boolean indexTableInReplication =
entry.getValue().stream()
.anyMatch(indexTableId -> tablesIdsInReplication.contains(indexTableId));
if (indexTableInReplication) {
tableIdsNotInReplication.add(entry.getKey());
for (String indexTableId : entry.getValue()) {
if (!tablesIdsInReplication.contains(indexTableId)) {
tableIdsNotInReplication.add(indexTableId);
}
}
}
}
}
}
} else {
throw new IllegalArgumentException(
"Unsupported table type " + xClusterConfig.getTableType() + " for xCluster config");
}

List<XClusterTableConfig> tableConfigNotInReplication = new ArrayList<>();
for (String tableId : tableIdsNotInReplication) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,11 +1230,8 @@ public Result needBootstrapTable(
Map<String, List<String>> sourceUniverseMainTableIndexTablesMap;
// For universes newer than or equal to 2.21.1.0-b168, we use the following method to improve
// performance.
if (Util.compareYbVersions(
"2.21.1.0-b168",
sourceUniverse.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion,
true)
<= 0) {
if (XClusterConfigTaskBase.universeTableInfoContainsIndexedTableId(
sourceUniverse.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion)) {
sourceUniverseMainTableIndexTablesMap =
XClusterConfigTaskBase.getMainTableIndexTablesMap(sourceUniverse, sourceTableInfoList);
} else {
Expand Down

0 comments on commit bd1e19e

Please sign in to comment.