Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

qos rate limiting #2709

Merged
merged 7 commits into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface AtlasDbFactory {
default KeyValueService createRawKeyValueService(
KeyValueServiceConfig config, Optional<LeaderConfig> leaderConfig) {
return createRawKeyValueService(config, leaderConfig, Optional.empty(), DEFAULT_INITIALIZE_ASYNC,
FakeQosClient.getDefault());
FakeQosClient.INSTANCE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.common.collect.Maps;
import com.palantir.atlasdb.cassandra.CassandraCredentialsConfig;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.keyvalue.cassandra.qos.QosCassandraClient;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.common.exception.AtlasDbDependencyException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ public void shutdown() {
@VisibleForTesting
static CassandraClientPoolImpl createImplForTest(CassandraKeyValueServiceConfig config,
StartupChecks startupChecks) {
return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, FakeQosClient.getDefault());
return create(config, startupChecks, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, FakeQosClient.INSTANCE);
}

public static CassandraClientPool create(CassandraKeyValueServiceConfig config) {
return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, FakeQosClient.getDefault());
return create(config, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC, FakeQosClient.INSTANCE);
}

public static CassandraClientPool create(CassandraKeyValueServiceConfig config, boolean initializeAsync,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private CassandraExpiringKeyValueService(
Optional<LeaderConfig> leaderConfig,
boolean initializeAsync) {
super(LoggerFactory.getLogger(CassandraKeyValueService.class), configManager, compactionManager, leaderConfig,
initializeAsync, FakeQosClient.getDefault());
initializeAsync, FakeQosClient.INSTANCE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,15 @@
import com.palantir.atlasdb.logging.LoggingArgs;
import com.palantir.atlasdb.qos.FakeQosClient;
import com.palantir.atlasdb.qos.QosClient;
import com.palantir.atlasdb.qos.ratelimit.QosAwareThrowables;
import com.palantir.atlasdb.qos.ratelimit.RateLimitExceededException;
import com.palantir.atlasdb.util.AnnotatedCallable;
import com.palantir.atlasdb.util.AnnotationType;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.common.annotation.Idempotent;
import com.palantir.common.base.ClosableIterator;
import com.palantir.common.base.ClosableIterators;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.base.Throwables;
import com.palantir.common.exception.PalantirRuntimeException;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
Expand Down Expand Up @@ -224,7 +225,7 @@ public static CassandraKeyValueService create(
CassandraKeyValueServiceConfigManager configManager,
Optional<LeaderConfig> leaderConfig,
boolean initializeAsync) {
return create(configManager, leaderConfig, initializeAsync, FakeQosClient.getDefault());
return create(configManager, leaderConfig, initializeAsync, FakeQosClient.INSTANCE);
}

public static CassandraKeyValueService create(
Expand All @@ -245,7 +246,7 @@ static CassandraKeyValueService create(
Optional<LeaderConfig> leaderConfig,
Logger log) {
return create(configManager, leaderConfig, log, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC,
FakeQosClient.getDefault());
FakeQosClient.INSTANCE);
}

private static CassandraKeyValueService create(
Expand Down Expand Up @@ -497,7 +498,7 @@ public String toString() {
}
return ImmutableMap.copyOf(result);
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -567,7 +568,7 @@ public Map<Cell, Value> get(TableReference tableRef, Map<Cell, Long> timestampBy
}
return builder.build();
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -795,7 +796,7 @@ private Map<byte[], RowColumnRangeIterator> getRowsColumnRangeIteratorForSingleH
}
return ret;
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -835,7 +836,7 @@ public String toString() {
}
});
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -975,7 +976,7 @@ public void put(final TableReference tableRef, final Map<Cell, byte[]> values, f
try {
putInternal("put", tableRef, KeyValueServices.toConstantTimestampValues(values.entrySet(), timestamp));
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand All @@ -995,7 +996,7 @@ public void putWithTimestamps(TableReference tableRef, Multimap<Cell, Value> val
try {
putInternal("putWithTimestamps", tableRef, values.entries());
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -1284,7 +1285,7 @@ public void truncateTables(final Set<TableReference> tablesToTruncate) {
throw new InsufficientConsistencyException("Truncating tables requires all Cassandra nodes"
+ " to be up and available.");
} catch (TException e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}
}
Expand Down Expand Up @@ -1416,7 +1417,7 @@ public String toString() {
throw new InsufficientConsistencyException("Deleting requires all Cassandra nodes to be up and available.",
e);
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -1635,7 +1636,7 @@ private void dropTablesInternal(final Set<TableReference> tablesToDrop) {
throw new InsufficientConsistencyException(
"Dropping tables requires all Cassandra nodes to be up and available.", e);
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -1761,7 +1762,7 @@ private Map<TableReference, byte[]> filterOutExistingTables(
}
}
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}

return filteredTables;
Expand All @@ -1782,7 +1783,8 @@ private void createTablesInternal(final Map<TableReference, byte[]> tableNamesTo
} catch (TException thriftException) {
if (thriftException.getMessage() != null
&& !thriftException.getMessage().contains("already existing table")) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(thriftException);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(
thriftException);
}
}
}
Expand Down Expand Up @@ -2024,7 +2026,7 @@ private void putMetadataAndMaybeAlterTables(
return null;
});
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -2089,7 +2091,7 @@ public void addGarbageCollectionSentinelValues(TableReference tableRef, Iterable
putInternal("addGarbageCollectionSentinelValues",
tableRef, Iterables.transform(cells, cell -> Maps.immutableEntry(cell, value)));
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -2146,7 +2148,7 @@ public void putUnlessExists(final TableReference tableRef, final Map<Cell, byte[
return null;
});
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -2182,7 +2184,7 @@ public void checkAndSet(final CheckAndSetRequest request) throws CheckAndSetExce
} catch (CheckAndSetException e) {
throw e;
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down Expand Up @@ -2470,8 +2472,11 @@ private <V> List<V> runAllTasksCancelOnFailure(List<Callable<V>> tasks) {
try {
//Callable<Void> returns null, so can't use immutable list
return Collections.singletonList(tasks.get(0).call());
} catch (RateLimitExceededException e) {
// Prioritise over
throw e;
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand All @@ -2486,7 +2491,7 @@ private <V> List<V> runAllTasksCancelOnFailure(List<Callable<V>> tasks) {
}
return results;
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
} finally {
for (Future<V> future : futures) {
future.cancel(true);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import com.palantir.atlasdb.keyvalue.cassandra.CassandraClient;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientPool;
import com.palantir.atlasdb.keyvalue.cassandra.TracingQueryRunner;
import com.palantir.atlasdb.qos.ratelimit.QosAwareThrowables;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.base.Throwables;

public class RowGetter {
private CassandraClientPool clientPool;
Expand Down Expand Up @@ -69,7 +69,7 @@ public List<KeySlice> apply(CassandraClient client) throws Exception {
throw new InsufficientConsistencyException("get_range_slices requires " + consistency
+ " Cassandra nodes to be up and available.", e);
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
throw QosAwareThrowables.unwrapAndThrowRateLimitExceededOrAtlasDbDependencyException(e);
}
}

Expand Down
Loading