diff --git a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/XClusterConfigTaskBase.java b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/XClusterConfigTaskBase.java index 2d5372db8c11..7932ba8b42e3 100644 --- a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/XClusterConfigTaskBase.java +++ b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/XClusterConfigTaskBase.java @@ -1264,6 +1264,24 @@ public static Map getSourceTableIdTargetTableIdMap( return sourceTableIdTargetTableIdMap; } + /** + * Returns a map that maps target table IDs to source table IDs. + * + * @param sourceTableInfoList A list of source table information. + * @param targetTableInfoList A list of target table information. + * @return A map that maps target table IDs to source table IDs. + */ + public static Map getTargetTableIdToSourceTableIdMap( + List sourceTableInfoList, + List targetTableInfoList) { + log.debug( + "switching the sequence of source and target tables in order to achieve the" + + " targetTableIdToSourceTableIdMap"); + Map targetTableIdToSourceTableIdMap = + getSourceTableIdTargetTableIdMap(targetTableInfoList, sourceTableInfoList); + return targetTableIdToSourceTableIdMap; + } + /** * It assumes table names in both {@code tableInfoListOnSource} and {@code tableInfoListOnTarget} * are unique except for colocated parent table names. @@ -1339,6 +1357,16 @@ public static Map getTableSchemas( return tableSchemaMap; } + /** + * Checks if the table info will contain an indexed table ID. + * + * @param ybSoftwareVersion The version of the YB software. + * @return True if the table info contains an indexed table ID, false otherwise. + */ + public static boolean universeTableInfoContainsIndexedTableId(String ybSoftwareVersion) { + return Util.compareYbVersions("2.21.1.0-b168", ybSoftwareVersion, true) <= 0; + } + /** * It returns a map from main table id to a list of index table ids associated with the main table * in the universe. `getIndexedTableId` was added to YBDB 2.21.1.0-b168. @@ -1351,11 +1379,8 @@ public static Map> getMainTableIndexTablesMap( Collection tableInfoList) { // Ensure it is not called for a universe older than 2.21.1.0-b168. For older version use the // other getMainTableIndexTablesMap method that uses an RPC available in older universes. - if (Util.compareYbVersions( - "2.21.1.0-b168", - universe.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion, - true) - > 0) { + if (!universeTableInfoContainsIndexedTableId( + universe.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion)) { throw new IllegalStateException( "This method is only supported for universes newer than or equal to 2.21.1.0-b168"); } @@ -1783,9 +1808,8 @@ public static void updateReplicationDetailsFromDB( } try { - if (xClusterConfig.getTableType().equals(XClusterConfig.TableType.YSQL) - && Arrays.asList(XClusterConfigStatusType.Running, XClusterConfigStatusType.Updating) - .contains(xClusterConfig.getStatus())) { + if (Arrays.asList(XClusterConfigStatusType.Running, XClusterConfigStatusType.Updating) + .contains(xClusterConfig.getStatus())) { addTransientTableConfigs( xClusterConfig, xClusterUniverseService, @@ -2015,6 +2039,8 @@ private static void addTransientTableConfigs( .map(tableInfo -> XClusterConfigTaskBase.getTableId(tableInfo)) .collect(Collectors.toSet()); + // Update the status for tables that were previously being replicated but have been dropped from + // the source. xClusterConfig.getTableDetails().stream() .filter(tableConfig -> tableConfig.getStatus() == XClusterTableConfig.Status.Running) .forEach( @@ -2024,6 +2050,9 @@ private static void addTransientTableConfigs( } }); + // Note: Tables dropped from target are already updated with status + // "UnableToFetch" as part of setReplicationStatus function. + Pair, List> tableConfigs = getXClusterTableConfigNotInReplication( xClusterUniverseService, @@ -2037,15 +2066,55 @@ private static void addTransientTableConfigs( .getFirst() .forEach( tableConfig -> { - xClusterConfig.addTableConfig(tableConfig); + Optional existingTableConfig = + xClusterConfig.getTableDetails().stream() + .filter(t -> t.getTableId().equals(tableConfig.getTableId())) + .findFirst(); + if (!existingTableConfig.isPresent()) { + xClusterConfig.addTableConfig(tableConfig); + } else { + log.info( + "Found table {} with status {} on source universe but already exists in" + + " xCluster config in YBA", + tableConfig.getTableId(), + tableConfig.getStatus()); + } }); - tableConfigs - .getSecond() - .forEach( - tableConfig -> { - xClusterConfig.addTableConfig(tableConfig); - }); + if (tableConfigs.getSecond().size() > 0) { + Map targetTableIdToSourceTableIdMap = + getTargetTableIdToSourceTableIdMap(sourceTableInfoList, targetTableInfoList); + + tableConfigs + .getSecond() + .forEach( + tableConfig -> { + String targetTableId = tableConfig.getTableId(); + String sourceTableId = targetTableIdToSourceTableIdMap.get(targetTableId); + if (sourceTableId != null) { + Optional existingTableConfig = + xClusterConfig.getTableDetails().stream() + .filter(t -> t.getTableId().equals(sourceTableId)) + .findFirst(); + if (!existingTableConfig.isPresent() + || existingTableConfig + .get() + .getStatus() + .equals(XClusterTableConfig.Status.ExtraTableOnSource)) { + xClusterConfig.addTableConfig(tableConfig); + } else { + log.info( + "Found table target: {} source: {} with status {} on target universe but" + + " already exists in xCluster config in YBA", + targetTableId, + sourceTableId, + tableConfig.getStatus()); + } + } else { + xClusterConfig.addTableConfig(tableConfig); + } + }); + } } /** @@ -2150,9 +2219,6 @@ private static void addSourceAndTargetTableInfo( List targetTableInfoList) { List targetTableConfigs = new ArrayList<>(); List sourceTableConfigs = new ArrayList<>(); - if (!xClusterConfig.getTableType().equals(XClusterConfig.TableType.YSQL)) { - return new Pair<>(sourceTableConfigs, targetTableConfigs); - } targetTableConfigs = getTargetOnlyTable( @@ -2178,13 +2244,15 @@ public static List getTargetOnlyTable( XClusterConfig xClusterConfig, CatalogEntityInfo.SysClusterConfigEntryPB clusterConfig, List targetTableInfoList) { - try { Set tableIdsInReplicationOnTargetUniverse = getConsumerTableIdsFromClusterConfig( clusterConfig, xClusterConfig.getReplicationGroupName()); return extractTablesNotInReplication( + xClusterConfig, + xClusterConfig.getTargetUniverseUUID(), + ybClientService, tableIdsInReplicationOnTargetUniverse, targetTableInfoList, XClusterTableConfig.Status.ExtraTableOnTarget); @@ -2208,6 +2276,9 @@ public static List getSourceOnlyTable( clusterConfig, xClusterConfig.getReplicationGroupName()); return extractTablesNotInReplication( + xClusterConfig, + xClusterConfig.getSourceUniverseUUID(), + ybClientService, tableIdsInReplicationOnSourceUniverse, sourceTableInfoList, XClusterTableConfig.Status.ExtraTableOnSource); @@ -2219,23 +2290,160 @@ public static List getSourceOnlyTable( } public static List extractTablesNotInReplication( + XClusterConfig xClusterConfig, + UUID universeUUID, + YBClientService ybClientService, Set tablesIdsInReplication, List allTables, XClusterTableConfig.Status missingTableStatus) { - Set namespaceIdsInReplication = - allTables.stream() - .filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo))) - .map(tableInfo -> tableInfo.getNamespace().getId().toStringUtf8()) - .collect(Collectors.toSet()); + Set tableIdsNotInReplication = new HashSet<>(); - Set tableIdsNotInReplication = - allTables.stream() - .filter(tableInfo -> namespaceIdsInReplication.contains(getNamespaceId(tableInfo))) - .filter(tableInfo -> isXClusterSupported(tableInfo)) - .filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo))) - .map(tableInfo -> getTableId(tableInfo)) - .collect(Collectors.toSet()); + if (xClusterConfig.getTableType().equals(XClusterConfig.TableType.YSQL)) { + // Gather all namespaceIds for tables that are in replication. + Set namespaceIdsInReplication = + allTables.stream() + .filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo))) + .map(tableInfo -> tableInfo.getNamespace().getId().toStringUtf8()) + .collect(Collectors.toSet()); + + // All the tables that is xCluster supported and belong to the namespace that is in + // replication but not in the tablesIdsInReplication are the tables that are not in + // replication. + tableIdsNotInReplication = + allTables.stream() + .filter(tableInfo -> namespaceIdsInReplication.contains(getNamespaceId(tableInfo))) + .filter(tableInfo -> isXClusterSupported(tableInfo)) + .filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo))) + .map(tableInfo -> getTableId(tableInfo)) + .collect(Collectors.toSet()); + } else if (xClusterConfig.getTableType().equals(XClusterConfig.TableType.YCQL)) { + Universe universe = Universe.getOrBadRequest(universeUUID); + // On old universes, tableInfo does not contain the field indexedTableId, so we need to gather + // index table ids using tableSchemas. + if (universeTableInfoContainsIndexedTableId( + universe.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion)) { + // Gather all index table ids not in replication by parsing each tables and checking if it's + // indexedTableId is in replication. + tableIdsNotInReplication = + allTables.stream() + .filter( + tableInfo -> + tableInfo.getRelationType().equals(RelationType.INDEX_TABLE_RELATION)) + .filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo))) + .filter( + tableInfo -> + tableInfo.getIndexedTableId() != null + && tablesIdsInReplication.contains(tableInfo.getIndexedTableId())) + .map(tableInfo -> getTableId(tableInfo)) + .collect(Collectors.toSet()); + + // Extract main table ids for tables that are not in replication but their index tables are + // in replication. + Set mainTableIdsNotInReplicationWithIndexInReplication = + allTables.stream() + .filter(tableInfo -> TableInfoUtil.isIndexTable(tableInfo)) + .filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo))) + .filter( + tableInfo -> !tablesIdsInReplication.contains(tableInfo.getIndexedTableId())) + .map(tableInfo -> tableInfo.getIndexedTableId()) + .collect(Collectors.toSet()); + + if (mainTableIdsNotInReplicationWithIndexInReplication.size() > 0) { + List + mainTableIdsNotInReplicationWithIndexInReplicationInfo = + allTables.stream() + .filter( + tableInfo -> + mainTableIdsNotInReplicationWithIndexInReplication.contains( + getTableId(tableInfo))) + .collect(Collectors.toList()); + + Map> mainTableIdToIndexTableIdsNotInReplicationMap = + getMainTableIndexTablesMap( + universe, mainTableIdsNotInReplicationWithIndexInReplicationInfo); + + // Add main table ids that are not in replication and its indexes which are not + // in replication. + for (Map.Entry> entry : + mainTableIdToIndexTableIdsNotInReplicationMap.entrySet()) { + boolean indexTableInReplication = + entry.getValue().stream() + .anyMatch(indexTableId -> tablesIdsInReplication.contains(indexTableId)); + if (indexTableInReplication) { + tableIdsNotInReplication.add(entry.getKey()); + for (String indexTableId : entry.getValue()) { + if (!tablesIdsInReplication.contains(indexTableId)) { + tableIdsNotInReplication.add(indexTableId); + } + } + } + } + } + + } else { + // Gather all main table ids for tables that are in replication. + Set mainTableIdsInReplication = + allTables.stream() + .filter(tableInfo -> !TableInfoUtil.isIndexTable(tableInfo)) + .filter(tableInfo -> tablesIdsInReplication.contains(getTableId(tableInfo))) + .map(tableInfo -> getTableId(tableInfo)) + .collect(Collectors.toSet()); + + // Index table ids which has its main table in replication. + Set indexTableIdsWithMainTableIdsInReplication = new HashSet<>(); + + // Gather all index table ids for tables that are in replication and add index table + // ids that are not in replication. + Map> mainTableIdToIndexTableIdsInReplicationMap = + getMainTableIndexTablesMap(ybClientService, universe, mainTableIdsInReplication); + for (List indexTableIdsList : mainTableIdToIndexTableIdsInReplicationMap.values()) { + for (String indexTableId : indexTableIdsList) { + if (!tablesIdsInReplication.contains(indexTableId)) { + tableIdsNotInReplication.add(indexTableId); + } + indexTableIdsWithMainTableIdsInReplication.add(indexTableId); + } + } + + // Index tables that are in replication but their main tables are not in replication. + Set indexTableIdsInReplicationWithMissingMainTable = + tablesIdsInReplication.stream() + .filter(tableId -> !indexTableIdsWithMainTableIdsInReplication.contains(tableId)) + .collect(Collectors.toSet()); + if (indexTableIdsInReplicationWithMissingMainTable.size() > 0) { + // All main tables in a universe which are not in replication. + Set mainTableIdsNotInReplication = + allTables.stream() + .filter(tableInfo -> !TableInfoUtil.isIndexTable(tableInfo)) + .filter(tableInfo -> !tablesIdsInReplication.contains(getTableId(tableInfo))) + .map(tableInfo -> getTableId(tableInfo)) + .collect(Collectors.toSet()); + Map> mainTableIdToIndexTableIdsNotInReplicationMap = + getMainTableIndexTablesMap(ybClientService, universe, mainTableIdsNotInReplication); + // Search through the indexes of each main table not in replication and if found, + // add the main table and index table to the list of tables not in replication + // if they are not already in replication. + for (Map.Entry> entry : + mainTableIdToIndexTableIdsNotInReplicationMap.entrySet()) { + boolean indexTableInReplication = + entry.getValue().stream() + .anyMatch(indexTableId -> tablesIdsInReplication.contains(indexTableId)); + if (indexTableInReplication) { + tableIdsNotInReplication.add(entry.getKey()); + for (String indexTableId : entry.getValue()) { + if (!tablesIdsInReplication.contains(indexTableId)) { + tableIdsNotInReplication.add(indexTableId); + } + } + } + } + } + } + } else { + throw new IllegalArgumentException( + "Unsupported table type " + xClusterConfig.getTableType() + " for xCluster config"); + } List tableConfigNotInReplication = new ArrayList<>(); for (String tableId : tableIdsNotInReplication) { diff --git a/managed/src/main/java/com/yugabyte/yw/controllers/XClusterConfigController.java b/managed/src/main/java/com/yugabyte/yw/controllers/XClusterConfigController.java index 3ab7741d26bf..1a7d5a8204ab 100644 --- a/managed/src/main/java/com/yugabyte/yw/controllers/XClusterConfigController.java +++ b/managed/src/main/java/com/yugabyte/yw/controllers/XClusterConfigController.java @@ -1230,11 +1230,8 @@ public Result needBootstrapTable( Map> sourceUniverseMainTableIndexTablesMap; // For universes newer than or equal to 2.21.1.0-b168, we use the following method to improve // performance. - if (Util.compareYbVersions( - "2.21.1.0-b168", - sourceUniverse.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion, - true) - <= 0) { + if (XClusterConfigTaskBase.universeTableInfoContainsIndexedTableId( + sourceUniverse.getUniverseDetails().getPrimaryCluster().userIntent.ybSoftwareVersion)) { sourceUniverseMainTableIndexTablesMap = XClusterConfigTaskBase.getMainTableIndexTablesMap(sourceUniverse, sourceTableInfoList); } else {