diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java index 2a36c147a3c..19d967dbff7 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/CassandraServersConfigs.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; import java.net.InetSocketAddress; @@ -156,6 +157,12 @@ public T accept(Visitor visitor) { } } + public static Set getCqlHosts(CassandraKeyValueServiceConfig config) { + return CassandraServersConfigs.getCqlCapableConfigIfValid(config) + .map(CassandraServersConfigs.CqlCapableConfig::cqlHosts) + .orElseThrow(() -> new SafeIllegalStateException("Attempting to get CQL hosts with thrift config!")); + } + public static Optional getCqlCapableConfigIfValid(CassandraKeyValueServiceConfig config) { return config.servers().accept(new Visitor<>() { @Override diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlCluster.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlCluster.java index c7c6f50ea6f..9d73322e869 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlCluster.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlCluster.java @@ -17,45 +17,18 @@ package com.palantir.atlasdb.cassandra.backup; import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.Statement; -import com.datastax.driver.core.TableMetadata; -import com.datastax.driver.core.querybuilder.QueryBuilder; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; import com.google.common.collect.RangeSet; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; -import com.palantir.atlasdb.cassandra.CassandraServersConfigs; -import com.palantir.atlasdb.cassandra.CassandraServersConfigs.CqlCapableConfig; import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraConstants; import com.palantir.atlasdb.keyvalue.cassandra.LightweightOppToken; import com.palantir.atlasdb.keyvalue.cassandra.async.client.creation.ClusterFactory; -import com.palantir.common.streams.KeyedStream; -import com.palantir.logsafe.SafeArg; -import com.palantir.logsafe.exceptions.SafeIllegalStateException; -import com.palantir.logsafe.logger.SafeLogger; -import com.palantir.logsafe.logger.SafeLoggerFactory; -import com.palantir.timestamp.FullyBoundedTimestampRange; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; public final class CqlCluster implements Closeable { - private static final SafeLogger log = SafeLoggerFactory.get(CqlCluster.class); - - private static final int LONG_READ_TIMEOUT_MS = (int) TimeUnit.MINUTES.toMillis(2); - // reduce this from default because we run RepairTableTask across N keyspaces at the same time - private static final int SELECT_FETCH_SIZE = 1_000; - private final Cluster cluster; private final CassandraKeyValueServiceConfig config; @@ -75,126 +48,17 @@ public void close() throws IOException { cluster.close(); } - private static Set getHosts(CassandraKeyValueServiceConfig config) { - return CassandraServersConfigs.getCqlCapableConfigIfValid(config) - .map(CqlCapableConfig::cqlHosts) - .orElseThrow(() -> new SafeIllegalStateException("Attempting to get token ranges with thrift config!")); - } - public Map> getTokenRanges(String tableName) { try (CqlSession session = new CqlSession(cluster.connect())) { - CqlMetadata metadata = session.getMetadata(); - String keyspaceName = config.getKeyspaceOrThrow(); - KeyspaceMetadata keyspace = metadata.getKeyspaceMetadata(keyspaceName); - TableMetadata tableMetadata = keyspace.getTable(tableName); - Set partitionTokens = getPartitionTokens(session, tableMetadata); - Map> tokenRangesByNode = - ClusterMetadataUtils.getTokenMapping(getHosts(config), metadata, keyspaceName, partitionTokens); - - if (!partitionTokens.isEmpty() && log.isDebugEnabled()) { - int numTokenRanges = tokenRangesByNode.values().stream() - .mapToInt(ranges -> ranges.asRanges().size()) - .sum(); - - log.debug( - "Identified token ranges requiring repair", - SafeArg.of("keyspace", keyspace), - SafeArg.of("table", tableName), - SafeArg.of("numPartitionKeys", partitionTokens.size()), - SafeArg.of("numTokenRanges", numTokenRanges)); - } - - return tokenRangesByNode; + return new TokenRangeFetcher(session, config).getTokenRange(tableName); } } public Map>> getTransactionsTableRangesForRepair( List transactionsTableInteractions) { try (CqlSession session = new CqlSession(cluster.connect())) { - String keyspaceName = config.getKeyspaceOrThrow(); - CqlMetadata metadata = session.getMetadata(); - - Map> partitionKeysByTable = - getPartitionTokensByTable(session, transactionsTableInteractions, keyspaceName, metadata); - - maybeLogTokenRanges(transactionsTableInteractions, partitionKeysByTable); - - Set hosts = getHosts(config); - return KeyedStream.stream(partitionKeysByTable) - .map(ranges -> ClusterMetadataUtils.getTokenMapping(hosts, metadata, keyspaceName, ranges)) - .collectToMap(); + return new RepairRangeFetcher(session, config) + .getTransactionTableRangesForRepair(transactionsTableInteractions); } } - - private static Map> getPartitionTokensByTable( - CqlSession cqlSession, - List transactionsTableInteractions, - String keyspaceName, - CqlMetadata metadata) { - Multimap interactionsByTable = - Multimaps.index(transactionsTableInteractions, TransactionsTableInteraction::getTransactionsTableName); - return KeyedStream.stream(interactionsByTable.asMap()) - .map(interactionsForTable -> getPartitionsTokenForSingleTransactionsTable( - cqlSession, keyspaceName, metadata, interactionsForTable)) - .collectToMap(); - } - - private static Set getPartitionsTokenForSingleTransactionsTable( - CqlSession cqlSession, - String keyspaceName, - CqlMetadata metadata, - Collection interactions) { - return interactions.stream() - .map(interaction -> - getPartitionTokensForTransactionsTable(cqlSession, keyspaceName, metadata, interaction)) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - } - - private static Set getPartitionTokensForTransactionsTable( - CqlSession cqlSession, - String keyspaceName, - CqlMetadata metadata, - TransactionsTableInteraction interaction) { - TableMetadata transactionsTableMetadata = - ClusterMetadataUtils.getTableMetadata(metadata, keyspaceName, interaction.getTransactionsTableName()); - List selectStatements = - interaction.createSelectStatementsForScanningFullTimestampRange(transactionsTableMetadata); - return cqlSession.retrieveRowKeysAtConsistencyAll(selectStatements); - } - - private static void maybeLogTokenRanges( - List transactionsTableInteractions, - Map> partitionKeysByTable) { - if (log.isDebugEnabled()) { - Multimap indexedInteractions = Multimaps.index( - transactionsTableInteractions, TransactionsTableInteraction::getTransactionsTableName); - Multimap loggableTableRanges = - Multimaps.transformValues(indexedInteractions, TransactionsTableInteraction::getTimestampRange); - Map numPartitionKeysByTable = - KeyedStream.stream(partitionKeysByTable).map(Set::size).collectToMap(); - log.debug( - "Identified token ranges requiring repair in the following transactions tables", - SafeArg.of("transactionsTablesWithRanges", loggableTableRanges), - SafeArg.of("numPartitionKeysByTable", numPartitionKeysByTable)); - } - } - - private static Set getPartitionTokens(CqlSession session, TableMetadata tableMetadata) { - return session.retrieveRowKeysAtConsistencyAll(createSelectStatements(tableMetadata)); - } - - private static List createSelectStatements(TableMetadata table) { - return ImmutableList.of(createSelectStatement(table)); - } - - private static Statement createSelectStatement(TableMetadata table) { - return QueryBuilder.select(CassandraConstants.ROW) - // only returns the column that we need instead of all of them, otherwise we get a timeout - .distinct() - .from(table) - .setConsistencyLevel(ConsistencyLevel.ALL) - .setFetchSize(SELECT_FETCH_SIZE) - .setReadTimeoutMillis(LONG_READ_TIMEOUT_MS); - } } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/RepairRangeFetcher.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/RepairRangeFetcher.java new file mode 100644 index 00000000000..25a207d5bd1 --- /dev/null +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/RepairRangeFetcher.java @@ -0,0 +1,111 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.cassandra.backup; + +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.TableMetadata; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.RangeSet; +import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; +import com.palantir.atlasdb.cassandra.CassandraServersConfigs; +import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction; +import com.palantir.atlasdb.keyvalue.cassandra.LightweightOppToken; +import com.palantir.common.streams.KeyedStream; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import com.palantir.timestamp.FullyBoundedTimestampRange; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +final class RepairRangeFetcher { + private static final SafeLogger log = SafeLoggerFactory.get(RepairRangeFetcher.class); + + private final CqlSession cqlSession; + private final CqlMetadata cqlMetadata; + private final CassandraKeyValueServiceConfig config; + + public RepairRangeFetcher(CqlSession cqlSession, CassandraKeyValueServiceConfig config) { + this.cqlSession = cqlSession; + this.cqlMetadata = cqlSession.getMetadata(); + this.config = config; + } + + public Map>> getTransactionTableRangesForRepair( + List transactionsTableInteractions) { + String keyspaceName = config.getKeyspaceOrThrow(); + + Map> partitionKeysByTable = + getPartitionTokensByTable(transactionsTableInteractions, keyspaceName); + + maybeLogTokenRanges(transactionsTableInteractions, partitionKeysByTable); + + Set hosts = CassandraServersConfigs.getCqlHosts(config); + return KeyedStream.stream(partitionKeysByTable) + .map(ranges -> ClusterMetadataUtils.getTokenMapping(hosts, cqlMetadata, keyspaceName, ranges)) + .collectToMap(); + } + + private Map> getPartitionTokensByTable( + List transactionsTableInteractions, String keyspaceName) { + Multimap interactionsByTable = + Multimaps.index(transactionsTableInteractions, TransactionsTableInteraction::getTransactionsTableName); + return KeyedStream.stream(interactionsByTable.asMap()) + .map(interactionsForTable -> + getPartitionsTokenForSingleTransactionsTable(keyspaceName, interactionsForTable)) + .collectToMap(); + } + + private Set getPartitionsTokenForSingleTransactionsTable( + String keyspaceName, Collection interactions) { + return interactions.stream() + .map(interaction -> getPartitionTokensForTransactionsTable(keyspaceName, interaction)) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + private Set getPartitionTokensForTransactionsTable( + String keyspaceName, TransactionsTableInteraction interaction) { + TableMetadata transactionsTableMetadata = ClusterMetadataUtils.getTableMetadata( + cqlMetadata, keyspaceName, interaction.getTransactionsTableName()); + List selectStatements = + interaction.createSelectStatementsForScanningFullTimestampRange(transactionsTableMetadata); + return cqlSession.retrieveRowKeysAtConsistencyAll(selectStatements); + } + + private static void maybeLogTokenRanges( + List transactionsTableInteractions, + Map> partitionKeysByTable) { + if (log.isDebugEnabled()) { + Multimap indexedInteractions = Multimaps.index( + transactionsTableInteractions, TransactionsTableInteraction::getTransactionsTableName); + Multimap loggableTableRanges = + Multimaps.transformValues(indexedInteractions, TransactionsTableInteraction::getTimestampRange); + Map numPartitionKeysByTable = + KeyedStream.stream(partitionKeysByTable).map(Set::size).collectToMap(); + log.debug( + "Identified token ranges requiring repair in the following transactions tables", + SafeArg.of("transactionsTablesWithRanges", loggableTableRanges), + SafeArg.of("numPartitionKeysByTable", numPartitionKeysByTable)); + } + } +} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/TokenRangeFetcher.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/TokenRangeFetcher.java new file mode 100644 index 00000000000..70ad83f367c --- /dev/null +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/TokenRangeFetcher.java @@ -0,0 +1,92 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.cassandra.backup; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.RangeSet; +import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; +import com.palantir.atlasdb.cassandra.CassandraServersConfigs; +import com.palantir.atlasdb.keyvalue.cassandra.CassandraConstants; +import com.palantir.atlasdb.keyvalue.cassandra.LightweightOppToken; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +final class TokenRangeFetcher { + private static final SafeLogger log = SafeLoggerFactory.get(TokenRangeFetcher.class); + + private static final int LONG_READ_TIMEOUT_MS = (int) TimeUnit.MINUTES.toMillis(2); + // reduce this from default because we run RepairTableTask across N keyspaces at the same time + private static final int SELECT_FETCH_SIZE = 1_000; + + private final CqlSession cqlSession; + private final CqlMetadata cqlMetadata; + private final CassandraKeyValueServiceConfig config; + + public TokenRangeFetcher(CqlSession cqlSession, CassandraKeyValueServiceConfig config) { + this.config = config; + this.cqlSession = cqlSession; + this.cqlMetadata = cqlSession.getMetadata(); + } + + public Map> getTokenRange(String tableName) { + String keyspaceName = config.getKeyspaceOrThrow(); + KeyspaceMetadata keyspace = cqlMetadata.getKeyspaceMetadata(keyspaceName); + TableMetadata tableMetadata = keyspace.getTable(tableName); + Set partitionTokens = getPartitionTokens(tableMetadata); + Map> tokenRangesByNode = ClusterMetadataUtils.getTokenMapping( + CassandraServersConfigs.getCqlHosts(config), cqlMetadata, keyspaceName, partitionTokens); + + if (!partitionTokens.isEmpty() && log.isDebugEnabled()) { + int numTokenRanges = tokenRangesByNode.values().stream() + .mapToInt(ranges -> ranges.asRanges().size()) + .sum(); + + log.debug( + "Identified token ranges requiring repair", + SafeArg.of("keyspace", keyspace), + SafeArg.of("table", tableName), + SafeArg.of("numPartitionKeys", partitionTokens.size()), + SafeArg.of("numTokenRanges", numTokenRanges)); + } + + return tokenRangesByNode; + } + + private Set getPartitionTokens(TableMetadata tableMetadata) { + return cqlSession.retrieveRowKeysAtConsistencyAll(ImmutableList.of(selectDistinctRowKeys(tableMetadata))); + } + + private static Statement selectDistinctRowKeys(TableMetadata table) { + return QueryBuilder.select(CassandraConstants.ROW) + // only returns the column that we need instead of all of them, otherwise we get a timeout + .distinct() + .from(table) + .setConsistencyLevel(ConsistencyLevel.ALL) + .setFetchSize(SELECT_FETCH_SIZE) + .setReadTimeoutMillis(LONG_READ_TIMEOUT_MS); + } +} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/client/creation/ClusterFactory.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/client/creation/ClusterFactory.java index 3b976aa72b0..28b5d26d0ba 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/client/creation/ClusterFactory.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/client/creation/ClusterFactory.java @@ -32,10 +32,8 @@ import com.datastax.driver.core.policies.WhiteListPolicy; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; import com.palantir.atlasdb.cassandra.CassandraServersConfigs; -import com.palantir.atlasdb.cassandra.CassandraServersConfigs.CqlCapableConfig; import com.palantir.atlasdb.keyvalue.cassandra.CassandraConstants; import com.palantir.conjure.java.config.ssl.SslSocketFactories; -import com.palantir.logsafe.exceptions.SafeIllegalStateException; import java.net.InetSocketAddress; import java.util.Set; import java.util.function.Supplier; @@ -49,7 +47,7 @@ public ClusterFactory(Supplier cqlClusterBuilderFactory) { } public Cluster constructCluster(CassandraKeyValueServiceConfig config) { - Set hosts = getHosts(config); + Set hosts = CassandraServersConfigs.getCqlHosts(config); return constructCluster(hosts, config); } @@ -74,13 +72,6 @@ public Cluster constructCluster(Set servers, CassandraKeyValu return clusterBuilder.build(); } - private static Set getHosts(CassandraKeyValueServiceConfig config) { - return CassandraServersConfigs.getCqlCapableConfigIfValid(config) - .map(CqlCapableConfig::cqlHosts) - .orElseThrow( - () -> new SafeIllegalStateException("Attempting to set up CqlCluster with thrift config!")); - } - private static Cluster.Builder withSocketOptions( Cluster.Builder clusterBuilder, CassandraKeyValueServiceConfig config) { return clusterBuilder.withSocketOptions(new SocketOptions() diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/client/creation/DefaultCqlClientFactory.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/client/creation/DefaultCqlClientFactory.java index 5e4877c1293..786a587f10f 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/client/creation/DefaultCqlClientFactory.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/client/creation/DefaultCqlClientFactory.java @@ -21,8 +21,6 @@ import com.palantir.atlasdb.cassandra.CassandraServersConfigs; import com.palantir.atlasdb.keyvalue.cassandra.async.CqlClient; import com.palantir.atlasdb.keyvalue.cassandra.async.CqlClientImpl; -import com.palantir.logsafe.logger.SafeLogger; -import com.palantir.logsafe.logger.SafeLoggerFactory; import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; import java.net.InetSocketAddress; import java.util.Optional; @@ -32,8 +30,6 @@ public class DefaultCqlClientFactory implements CqlClientFactory { public static final CqlClientFactory DEFAULT = new DefaultCqlClientFactory(); - private static final SafeLogger log = SafeLoggerFactory.get(DefaultCqlClientFactory.class); - private final Supplier cqlClusterBuilderFactory; public DefaultCqlClientFactory(Supplier cqlClusterBuilderFactory) {