From bd1e19ea225c2ac3b808fc03e4a1439c958f0c57 Mon Sep 17 00:00:00 2001 From: Vipul Bansal Date: Mon, 5 Aug 2024 03:00:56 +0000 Subject: [PATCH] [PLAT-14835]: Add extra transient YCQL index tables in xClusterTableConfig during GET calls and metrics. Summary: - Implemented changes to add extra ycql index present on either source or target universe that should be in replication but are not added yet. - Fixed the duplication issue for the table configs due to intermittent table configs. Now, we will notice duplication only when we have 2 new tables that are not part of replication but present independently on the source and target universe. Test Plan: Tested the above changes through the following steps: - Create replication for the YCQL table with no index table and then verify that transient table configs are present with the desired status if they are present on either source and target universe - Verified that we do not duplicate table configs when we drop/add a new table on either universe due to newly transient table configs. Reviewers: hzare, jmak Reviewed By: hzare Subscribers: yugaware Differential Revision: https://phorge.dev.yugabyte.com/D37041 --- .../tasks/XClusterConfigTaskBase.java | 270 ++++++++++++++++-- .../controllers/XClusterConfigController.java | 7 +- 2 files changed, 241 insertions(+), 36 deletions(-) diff --git a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/XClusterConfigTaskBase.java b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/XClusterConfigTaskBase.java index 2d5372db8c11..7932ba8b42e3 100644 --- a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/XClusterConfigTaskBase.java +++ b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/XClusterConfigTaskBase.java @@ -1264,6 +1264,24 @@ public static Map getSourceTableIdTargetTableIdMap( return sourceTableIdTargetTableIdMap; } + /** + * Returns a map that maps target table IDs to source table IDs. + * + * @param sourceTableInfoList A list of source table information. + * @param targetTableInfoList A list of target table information. + * @return A map that maps target table IDs to source table IDs. + */ + public static Map getTargetTableIdToSourceTableIdMap( + List sourceTableInfoList, + List targetTableInfoList) { + log.debug( + "switching the sequence of source and target tables in order to achieve the" + + " targetTableIdToSourceTableIdMap"); + Map targetTableIdToSourceTableIdMap = + getSourceTableIdTargetTableIdMap(targetTableInfoList, sourceTableInfoList); + return targetTableIdToSourceTableIdMap; + } + /** * It assumes table names in both {@code tableInfoListOnSource} and {@code tableInfoListOnTarget} * are unique except for colocated parent table names. @@ -1339,6 +1357,16 @@ public static Map getTableSchemas( return tableSchemaMap; } + /** + * Checks if the table info will contain an indexed table ID. + * + * @param ybSoftwareVersion The version of the YB software. + * @return True if the table info contains an indexed table ID, false otherwise. + */ + public static boolean universeTableInfoContainsIndexedTableId(String ybSoftwareVersion) { + return Util.compareYbVersions("2.21.1.0-b168", ybSoftwareVersion, true) <= 0; + } + /** * It returns a map from main table id to a list of index table ids associated with the main table * in the universe. `getIndexedTableId` was added to YBDB 2.21.1.0-b168. @@ -1351,11 +1379,8 @@ public static Map> getMainTableIndexTablesMap( Collection tableInfoList) { // Ensure it is not called for a universe older than 2.21.1.0-b168. For older version use the // other getMainTableIndexTablesMap method that uses an RPC available in older universes. - if (Util.compareYbVersions( - "2.21.1.0-b168", - universe.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion, - true) - > 0) { + if (!universeTableInfoContainsIndexedTableId( + universe.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion)) { throw new IllegalStateException( "This method is only supported for universes newer than or equal to 2.21.1.0-b168"); } @@ -1783,9 +1808,8 @@ public static void updateReplicationDetailsFromDB( } try { - if (xClusterConfig.getTableType().equals(XClusterConfig.TableType.YSQL) - && Arrays.asList(XClusterConfigStatusType.Running, XClusterConfigStatusType.Updating) - .contains(xClusterConfig.getStatus())) { + if (Arrays.asList(XClusterConfigStatusType.Running, XClusterConfigStatusType.Updating) + .contains(xClusterConfig.getStatus())) { addTransientTableConfigs( xClusterConfig, xClusterUniverseService, @@ -2015,6 +2039,8 @@ private static void addTransientTableConfigs( .map(tableInfo -> XClusterConfigTaskBase.getTableId(tableInfo)) .collect(Collectors.toSet()); + // Update the status for tables that were previously being replicated but have been dropped from + // the source. xClusterConfig.getTableDetails().stream() .filter(tableConfig -> tableConfig.getStatus() == XClusterTableConfig.Status.Running) .forEach( @@ -2024,6 +2050,9 @@ private static void addTransientTableConfigs( } }); + // Note: Tables dropped from target are already updated with status + // "UnableToFetch" as part of setReplicationStatus function. + Pair, List> tableConfigs = getXClusterTableConfigNotInReplication( xClusterUniverseService, @@ -2037,15 +2066,55 @@ private static void addTransientTableConfigs( .getFirst() .forEach( tableConfig -> { - xClusterConfig.addTableConfig(tableConfig); + Optional existingTableConfig = + xClusterConfig.getTableDetails().stream() + .filter(t -> t.getTableId().equals(tableConfig.getTableId())) + .findFirst(); + if (!existingTableConfig.isPresent()) { + xClusterConfig.addTableConfig(tableConfig); + } else { + log.info( + "Found table {} with status {} on source universe but already exists in" + + " xCluster config in YBA", + tableConfig.getTableId(), + tableConfig.getStatus()); + } }); - tableConfigs - .getSecond() - .forEach( - tableConfig -> { - xClusterConfig.addTableConfig(tableConfig); - }); + if (tableConfigs.getSecond().size() > 0) { + Map targetTableIdToSourceTableIdMap = + getTargetTableIdToSourceTableIdMap(sourceTableInfoList, targetTableInfoList); + + tableConfigs + .getSecond() + .forEach( + tableConfig -> { + String targetTableId = tableConfig.getTableId(); + String sourceTableId = targetTableIdToSourceTableIdMap.get(targetTableId); + if (sourceTableId != null) { + Optional existingTableConfig = + xClusterConfig.getTableDetails().stream() + .filter(t -> t.getTableId().equals(sourceTableId)) + .findFirst(); + if (!existingTableConfig.isPresent() + || existingTableConfig + .get() + .getStatus() + .equals(XClusterTableConfig.Status.ExtraTableOnSource)) { + xClusterConfig.addTableConfig(tableConfig); + } else { + log.info( + "Found table target: {} source: {} with status {} on target universe but" + + " already exists in xCluster config in YBA", + targetTableId, + sourceTableId, + tableConfig.getStatus()); + } + } else { + xClusterConfig.addTableConfig(tableConfig); + } + }); + } } /** @@ -2150,9 +2219,6 @@ private static void addSourceAndTargetTableInfo( List targetTableInfoList) { List targetTableConfigs = new ArrayList<>(); List sourceTableConfigs = new ArrayList<>(); - if (!xClusterConfig.getTableType().equals(XClusterConfig.TableType.YSQL)) { - return new Pair<>(sourceTableConfigs, targetTableConfigs); - } targetTableConfigs = getTargetOnlyTable( @@ -2178,13 +2244,15 @@ public static List getTargetOnlyTable( XClusterConfig xClusterConfig, CatalogEntityInfo.SysClusterConfigEntryPB clusterConfig, List targetTableInfoList) { - try { Set tableIdsInReplicationOnTargetUniverse = getConsumerTableIdsFromClusterConfig( clusterConfig, xClusterConfig.getReplicationGroupName()); return extractTablesNotInReplication( + xClusterConfig, + xClusterConfig.getTargetUniverseUUID(), + ybClientService, tableIdsInReplicationOnTargetUniverse, targetTableInfoList, XClusterTableConfig.Status.ExtraTableOnTarget); @@ -2208,6 +2276,9 @@ public static List getSourceOnlyTable( clusterConfig, xClusterConfig.getReplicationGroupName()); return extractTablesNotInReplication( + xClusterConfig, + xClusterConfig.getSourceUniverseUUID(), + ybClientService, tableIdsInReplicationOnSourceUniverse, sourceTableInfoList, XClusterTableConfig.Status.ExtraTableOnSource); @@ -2219,23 +2290,160 @@ public static List getSourceOnlyTable( } public static List extractTablesNotInReplication( + XClusterConfig xClusterConfig, + UUID universeUUID, + YBClientService ybClientService, Set tablesIdsInReplication, List allTables, XClusterTableConfig.Status missingTableStatus) { - Set namespaceIdsInReplication = - allTables.stream() - .filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo))) - .map(tableInfo -> tableInfo.getNamespace().getId().toStringUtf8()) - .collect(Collectors.toSet()); + Set tableIdsNotInReplication = new HashSet<>(); - Set tableIdsNotInReplication = - allTables.stream() - .filter(tableInfo -> namespaceIdsInReplication.contains(getNamespaceId(tableInfo))) - .filter(tableInfo -> isXClusterSupported(tableInfo)) - .filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo))) - .map(tableInfo -> getTableId(tableInfo)) - .collect(Collectors.toSet()); + if (xClusterConfig.getTableType().equals(XClusterConfig.TableType.YSQL)) { + // Gather all namespaceIds for tables that are in replication. + Set namespaceIdsInReplication = + allTables.stream() + .filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo))) + .map(tableInfo -> tableInfo.getNamespace().getId().toStringUtf8()) + .collect(Collectors.toSet()); + + // All the tables that is xCluster supported and belong to the namespace that is in + // replication but not in the tablesIdsInReplication are the tables that are not in + // replication. + tableIdsNotInReplication = + allTables.stream() + .filter(tableInfo -> namespaceIdsInReplication.contains(getNamespaceId(tableInfo))) + .filter(tableInfo -> isXClusterSupported(tableInfo)) + .filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo))) + .map(tableInfo -> getTableId(tableInfo)) + .collect(Collectors.toSet()); + } else if (xClusterConfig.getTableType().equals(XClusterConfig.TableType.YCQL)) { + Universe universe = Universe.getOrBadRequest(universeUUID); + // On old universes, tableInfo does not contain the field indexedTableId, so we need to gather + // index table ids using tableSchemas. + if (universeTableInfoContainsIndexedTableId( + universe.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion)) { + // Gather all index table ids not in replication by parsing each tables and checking if it's + // indexedTableId is in replication. + tableIdsNotInReplication = + allTables.stream() + .filter( + tableInfo -> + tableInfo.getRelationType().equals(RelationType.INDEX_TABLE_RELATION)) + .filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo))) + .filter( + tableInfo -> + tableInfo.getIndexedTableId() != null + && tablesIdsInReplication.contains(tableInfo.getIndexedTableId())) + .map(tableInfo -> getTableId(tableInfo)) + .collect(Collectors.toSet()); + + // Extract main table ids for tables that are not in replication but their index tables are + // in replication. + Set mainTableIdsNotInReplicationWithIndexInReplication = + allTables.stream() + .filter(tableInfo -> TableInfoUtil.isIndexTable(tableInfo)) + .filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo))) + .filter( + tableInfo -> !tablesIdsInReplication.contains(tableInfo.getIndexedTableId())) + .map(tableInfo -> tableInfo.getIndexedTableId()) + .collect(Collectors.toSet()); + + if (mainTableIdsNotInReplicationWithIndexInReplication.size() > 0) { + List + mainTableIdsNotInReplicationWithIndexInReplicationInfo = + allTables.stream() + .filter( + tableInfo -> + mainTableIdsNotInReplicationWithIndexInReplication.contains( + getTableId(tableInfo))) + .collect(Collectors.toList()); + + Map> mainTableIdToIndexTableIdsNotInReplicationMap = + getMainTableIndexTablesMap( + universe, mainTableIdsNotInReplicationWithIndexInReplicationInfo); + + // Add main table ids that are not in replication and its indexes which are not + // in replication. + for (Map.Entry> entry : + mainTableIdToIndexTableIdsNotInReplicationMap.entrySet()) { + boolean indexTableInReplication = + entry.getValue().stream() + .anyMatch(indexTableId -> tablesIdsInReplication.contains(indexTableId)); + if (indexTableInReplication) { + tableIdsNotInReplication.add(entry.getKey()); + for (String indexTableId : entry.getValue()) { + if (!tablesIdsInReplication.contains(indexTableId)) { + tableIdsNotInReplication.add(indexTableId); + } + } + } + } + } + + } else { + // Gather all main table ids for tables that are in replication. + Set mainTableIdsInReplication = + allTables.stream() + .filter(tableInfo -> !TableInfoUtil.isIndexTable(tableInfo)) + .filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo))) + .map(tableInfo -> getTableId(tableInfo)) + .collect(Collectors.toSet()); + + // Index table ids which has its main table in replication. + Set indexTableIdsWithMainTableIdsInReplication = new HashSet<>(); + + // Gather all index table ids for tables that are in replication and add index table + // ids that are not in replication. + Map> mainTableIdToIndexTableIdsInReplicationMap = + getMainTableIndexTablesMap(ybClientService, universe, mainTableIdsInReplication); + for (List indexTableIdsList : mainTableIdToIndexTableIdsInReplicationMap.values()) { + for (String indexTableId : indexTableIdsList) { + if (!tablesIdsInReplication.contains(indexTableId)) { + tableIdsNotInReplication.add(indexTableId); + } + indexTableIdsWithMainTableIdsInReplication.add(indexTableId); + } + } + + // Index tables that are in replication but their main tables are not in replication. + Set indexTableIdsInReplicationWithMissingMainTable = + tablesIdsInReplication.stream() + .filter(tableId -> !indexTableIdsWithMainTableIdsInReplication.contains(tableId)) + .collect(Collectors.toSet()); + if (indexTableIdsInReplicationWithMissingMainTable.size() > 0) { + // All main tables in a universe which are not in replication. + Set mainTableIdsNotInReplication = + allTables.stream() + .filter(tableInfo -> !TableInfoUtil.isIndexTable(tableInfo)) + .filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo))) + .map(tableInfo -> getTableId(tableInfo)) + .collect(Collectors.toSet()); + Map> mainTableIdToIndexTableIdsNotInReplicationMap = + getMainTableIndexTablesMap(ybClientService, universe, mainTableIdsNotInReplication); + // Search through the indexes of each main table not in replication and if found, + // add the main table and index table to the list of tables not in replication + // if they are not already in replication. + for (Map.Entry> entry : + mainTableIdToIndexTableIdsNotInReplicationMap.entrySet()) { + boolean indexTableInReplication = + entry.getValue().stream() + .anyMatch(indexTableId -> tablesIdsInReplication.contains(indexTableId)); + if (indexTableInReplication) { + tableIdsNotInReplication.add(entry.getKey()); + for (String indexTableId : entry.getValue()) { + if (!tablesIdsInReplication.contains(indexTableId)) { + tableIdsNotInReplication.add(indexTableId); + } + } + } + } + } + } + } else { + throw new IllegalArgumentException( + "Unsupported table type " + xClusterConfig.getTableType() + " for xCluster config"); + } List tableConfigNotInReplication = new ArrayList<>(); for (String tableId : tableIdsNotInReplication) { diff --git a/managed/src/main/java/com/yugabyte/yw/controllers/XClusterConfigController.java b/managed/src/main/java/com/yugabyte/yw/controllers/XClusterConfigController.java index 3ab7741d26bf..1a7d5a8204ab 100644 --- a/managed/src/main/java/com/yugabyte/yw/controllers/XClusterConfigController.java +++ b/managed/src/main/java/com/yugabyte/yw/controllers/XClusterConfigController.java @@ -1230,11 +1230,8 @@ public Result needBootstrapTable( Map> sourceUniverseMainTableIndexTablesMap; // For universes newer than or equal to 2.21.1.0-b168, we use the following method to improve // performance. - if (Util.compareYbVersions( - "2.21.1.0-b168", - sourceUniverse.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion, - true) - <= 0) { + if (XClusterConfigTaskBase.universeTableInfoContainsIndexedTableId( + sourceUniverse.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion)) { sourceUniverseMainTableIndexTablesMap = XClusterConfigTaskBase.getMainTableIndexTablesMap(sourceUniverse, sourceTableInfoList); } else {